Sorry, I realized that I forgot to include one additional piece, our 
EdiTransationSetFactory, here it is:

package supplies.facility.edi.helpers;

import org.apache.commons.lang.StringUtils;
import org.codehaus.jackson.map.ObjectMapper;
import supplies.facility.edi.models.EdiTransactionSet;

import java.nio.ByteBuffer;

/**
* Created by Bob on 2/21/2016.
*/
public class EdiTransactionSetFactory {

    private EdiTransactionSetFactory() {}

    public static EdiTransactionSet createEdiTransactionSet(ByteBuffer data) {
        EdiTransactionSet obj = null;
        try {
            byte[] bytes = new byte[data.remaining()];
            data.get(bytes);
            String jsonString = new String(bytes);
            obj = createEdiTransactionSet(jsonString);
        } catch (Exception var4) {
            throw new RuntimeException(var4);
        }
        return obj;
    }

    public static EdiTransactionSet createEdiTransactionSet(String jsonString) {
        EdiTransactionSet obj = null;
        try {
            if (StringUtils.isEmpty(jsonString)) {
                jsonString = "{}";
            }
            ObjectMapper mapper = new ObjectMapper();
            obj = mapper.readValue(jsonString, EdiTransactionSet.class);
        } catch (Exception var4) {
            throw new RuntimeException(var4);
        }
        return obj;
    }
}


From: Jim
Sent: Tuesday, May 10, 2016 12:42 PM
To: users@apex.incubator.apache.org
Subject: FW: Problems with Kinesis Stream


Hi Pradeep,

Here is an extract from my pom.xml with the relevant entries:

    <!-- change this if you desire to use a different version of DataTorrent -->
    <malhar.version>3.3.1-incubating</malhar.version>
    <apex.version>3.3.0-incubating</apex.version>
    
<datatorrent.apppackage.classpath>lib/*.jar</datatorrent.apppackage.classpath>
    <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath>
    <datatorrent.appconf.files>files/*</datatorrent.appconf.files>
    
<maven-enforcer-plugin.version>[1.0,2.0-alpha)</maven-enforcer-plugin.version>
<dependency>
  <groupId>com.amazonaws</groupId>
  <artifactId>aws-java-sdk-kinesis</artifactId>
  <version>1.10.76</version>
</dependency>

Everything was in sync with your demo application.

I did see in your demo application that you created your own 
StringKinesisInputOperator where you override the getTuple function.

In our app we create a KinesisEDITransactionSetInputOperator, but we do not 
override the getTuple() function.  Is there any reason that we should do that?  
Here is our class (note we have some database access where we save the current 
shard position to a database table, so if the application restarts, we can use 
setInitialOffset(“earliest”) and when loading them have a custom shardManager 
that reads this table and sets the initial position to this location – so that 
we don’t duplicate completed transactions AND still pick up transactions that 
were added to the queue in case the system was down.  We need this for 
restartability, in case we update the application and it starts under a new 
application id, so w don’t have to worry about missing records or duplicating 
(I think).

Here is our class – does anything stand out as missing?:



package supplies.facility.edi.helpers;

import com.amazonaws.services.kinesis.model.Record;
import com.datatorrent.api.Context;
import com.datatorrent.contrib.kinesis.AbstractKinesisInputOperator;
import supplies.facility.edi.models.EdiTransactionSet;
import supplies.facility.edi.models.OptionalOperatorProperties;

import java.sql.PreparedStatement;
import java.util.HashMap;
import java.util.Map;

/**
 * Created by Bob on 2/21/2016.
 */
public class KinesisEdiTransactionSetInputOperator extends 
AbstractKinesisInputOperator<EdiTransactionSet> {

    Map<String, String> previousPosition = new HashMap<String, String>();

    public KinesisEdiTransactionSetInputOperator() {
        super();
    }

    private OptionalOperatorProperties optionalProperties = new 
OptionalOperatorProperties();

    public OptionalOperatorProperties getOptionalProperties() {
        return optionalProperties;
    }

    public void setOptionalProperties(OptionalOperatorProperties 
optionalProperties) {
        this.optionalProperties = optionalProperties;
    }

    @Override
    public EdiTransactionSet getTuple(Record record) {
        try {
            EdiTransactionSet obj = 
EdiTransactionSetFactory.createEdiTransactionSet(record.getData());
            return obj;
        } catch (Exception var4) {
            throw new RuntimeException(var4);
        }
    }

    @Override
    public void endWindow() {
        super.endWindow();

        // Update the database for the curent shard position so that we can 
recover if we need to restart the application
        // with no risk of duplicate transactions
        // Update the order to replace the line_items data with the newly 
updated one
        try {
            if (!previousPosition.equals(shardPosition)) {
                // Loop through all values in the shard map
                for (Map.Entry<String, String> entry : 
shardPosition.entrySet()) {
                    PreparedStatement statement = 
getOptionalProperties().getEdiJdbc().getConnection().prepareStatement("insert 
into kinesis_shard_manager values(?, ?) on conflict on constraint 
pk_kinesis_shard_manager do update set sequence = excluded.sequence");
                    statement.setString(1, entry.getKey());
                    statement.setString(2, entry.getValue());
                    statement.executeUpdate();
                    statement.close();
                    
getOptionalProperties().getEdiJdbc().getConnection().commit();
                }

                // Set the previous position
                previousPosition = shardPosition;
            }
        }
        catch (Exception e) {
            throw new RuntimeException("Exception while handling the input", e);
        }
    }

    @Override
    public void teardown() {
        super.teardown();

        // Disconnect from the edi jdbcStore
        this.optionalProperties.getEdiJdbc().disconnect();
    }

    @Override
    public void setup(Context.OperatorContext context) {

        super.setup(context);

        try {
            // Connect to the Edi jdbcstore for this operator in the
            this.optionalProperties.getEdiJdbc().connect();
            
this.optionalProperties.getEdiJdbc().getConnection().setAutoCommit(false);
        } catch (Exception e) {
            throw new RuntimeException("Exception setting up the abstract edi 
processor: ", e);
        }
    }
}


From: Pradeep Dalvi [mailto:pradeep.da...@datatorrent.com]
Sent: Tuesday, May 10, 2016 6:18 AM
To: Jim <jim@facility.supplies<mailto:jim@facility.supplies>>
Subject: Re: Problems with Kinesis Stream

Hi Jim,

I don't have much of background about Kinesis Operator. So I would coordinate 
with correct set of engineers and help you resolve issues on priority.
I was with Chaitanya to try out different options. We also tried restarting the 
application.
We would like to know little more details about scenarios, setup & RTS version 
that is being used. AFAIR from last conversation, we had suggested using DT RTS 
3.3.0-RC5
Shall we schedule a call for this? Please feel free to suggest convenient 
timings. I shall check with Chaitanya.

Thanks,
Pradeep A. Dalvi

On Tue, May 10, 2016 at 3:51 PM, Jim 
<jim@facility.supplies<mailto:jim@facility.supplies>> wrote:
Hi Chaitanya,

And you are adding new records, and see them flow in.

And if you restart the application, and have it set to “earliest” it always 
retrieves all the records, then continues to get new ones as records are added?

We had two different developers on two different machines get the same kind of 
errors.

I will compare our applications to yours to see what could be different.

Jim

From: Chaitanya Chebolu 
[mailto:chaita...@datatorrent.com<mailto:chaita...@datatorrent.com>]
Sent: Tuesday, May 10, 2016 5:17 AM
To: users@apex.incubator.apache.org<mailto:users@apex.incubator.apache.org>
Subject: Re: Problems with Kinesis Stream

Hi JIm,

    I created sample application with the same dependencies which you have 
specified.
    I ran with earliest and latest offsets, it's working fine. I haven't 
observed any records loss in both the scenarios.
    We tried two different setups: local as well as on AWS.
    Please find the pom and application in the below location:
https://gist.github.com/chaithu14/b7748af582e451eed8082995487f61d7


Regards,
Chaitanya

On Tue, May 10, 2016 at 1:18 PM, Jim 
<jim@facility.supplies<mailto:jim@facility.supplies>> wrote:
Hello,

I am using apex in an application, where one of the inputs is an AWS Kinesis 
stream.  I am using this over AWS SQS because I need to guarantee that the 
items are processing in the order in which they are received by the system.

I have an operator that uses the base AbstractKinesisInputOperator.

We are currently using:

                Apex version 3.3.0-incubating
                Malhar version 3.3.1-incubating
                aws-java-sdk-kinesis version 1.10.76

I am noticing strange behavior of this operator and cannot pin down where the 
issue is coming from, if it is from the AWS sdk, or from the datatorrent apex 
module.

Here is what I am seeing:


1.)    When I setInitialOffset(“latest”); when I start the application, I do 
not see the next transactions always processed by the application.  From what I 
can see it is very hit or miss, which transactions are actually read, and send 
through for processing.  I have no idea why.



2.)    When I setInitialOffset(“earliest”); it does seem to pick up all 
records, and read new records – for a while.  Then at some point; it stops 
processing new records and even if I restart the application so it should start 
again from the beginning, nothing is read in.  If I then delete and recreate 
the Kinesis stream it does work again for a while, till it stops again.


Is anyone successfully using a Kinesis stream to process records, and are 
confident that no issues like this are occurring so transactions are not being 
missed?

Can someone at apex/datatorrent look into this, and help figure out what is 
happening, and how we can fix this?

Note that I am just about ready to go live, and would like to get this resolved 
this week!

Thanks,

Jim




--
Pradeep A. Dalvi
Software Engineer
DataTorrent (India)

Reply via email to