Sorry, I realized that I forgot to include one additional piece, our EdiTransationSetFactory, here it is:
package supplies.facility.edi.helpers; import org.apache.commons.lang.StringUtils; import org.codehaus.jackson.map.ObjectMapper; import supplies.facility.edi.models.EdiTransactionSet; import java.nio.ByteBuffer; /** * Created by Bob on 2/21/2016. */ public class EdiTransactionSetFactory { private EdiTransactionSetFactory() {} public static EdiTransactionSet createEdiTransactionSet(ByteBuffer data) { EdiTransactionSet obj = null; try { byte[] bytes = new byte[data.remaining()]; data.get(bytes); String jsonString = new String(bytes); obj = createEdiTransactionSet(jsonString); } catch (Exception var4) { throw new RuntimeException(var4); } return obj; } public static EdiTransactionSet createEdiTransactionSet(String jsonString) { EdiTransactionSet obj = null; try { if (StringUtils.isEmpty(jsonString)) { jsonString = "{}"; } ObjectMapper mapper = new ObjectMapper(); obj = mapper.readValue(jsonString, EdiTransactionSet.class); } catch (Exception var4) { throw new RuntimeException(var4); } return obj; } } From: Jim Sent: Tuesday, May 10, 2016 12:42 PM To: users@apex.incubator.apache.org Subject: FW: Problems with Kinesis Stream Hi Pradeep, Here is an extract from my pom.xml with the relevant entries: <!-- change this if you desire to use a different version of DataTorrent --> <malhar.version>3.3.1-incubating</malhar.version> <apex.version>3.3.0-incubating</apex.version> <datatorrent.apppackage.classpath>lib/*.jar</datatorrent.apppackage.classpath> <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath> <datatorrent.appconf.files>files/*</datatorrent.appconf.files> <maven-enforcer-plugin.version>[1.0,2.0-alpha)</maven-enforcer-plugin.version> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-kinesis</artifactId> <version>1.10.76</version> </dependency> Everything was in sync with your demo application. I did see in your demo application that you created your own StringKinesisInputOperator where you override the getTuple function. In our app we create a KinesisEDITransactionSetInputOperator, but we do not override the getTuple() function. Is there any reason that we should do that? Here is our class (note we have some database access where we save the current shard position to a database table, so if the application restarts, we can use setInitialOffset(“earliest”) and when loading them have a custom shardManager that reads this table and sets the initial position to this location – so that we don’t duplicate completed transactions AND still pick up transactions that were added to the queue in case the system was down. We need this for restartability, in case we update the application and it starts under a new application id, so w don’t have to worry about missing records or duplicating (I think). Here is our class – does anything stand out as missing?: package supplies.facility.edi.helpers; import com.amazonaws.services.kinesis.model.Record; import com.datatorrent.api.Context; import com.datatorrent.contrib.kinesis.AbstractKinesisInputOperator; import supplies.facility.edi.models.EdiTransactionSet; import supplies.facility.edi.models.OptionalOperatorProperties; import java.sql.PreparedStatement; import java.util.HashMap; import java.util.Map; /** * Created by Bob on 2/21/2016. */ public class KinesisEdiTransactionSetInputOperator extends AbstractKinesisInputOperator<EdiTransactionSet> { Map<String, String> previousPosition = new HashMap<String, String>(); public KinesisEdiTransactionSetInputOperator() { super(); } private OptionalOperatorProperties optionalProperties = new OptionalOperatorProperties(); public OptionalOperatorProperties getOptionalProperties() { return optionalProperties; } public void setOptionalProperties(OptionalOperatorProperties optionalProperties) { this.optionalProperties = optionalProperties; } @Override public EdiTransactionSet getTuple(Record record) { try { EdiTransactionSet obj = EdiTransactionSetFactory.createEdiTransactionSet(record.getData()); return obj; } catch (Exception var4) { throw new RuntimeException(var4); } } @Override public void endWindow() { super.endWindow(); // Update the database for the curent shard position so that we can recover if we need to restart the application // with no risk of duplicate transactions // Update the order to replace the line_items data with the newly updated one try { if (!previousPosition.equals(shardPosition)) { // Loop through all values in the shard map for (Map.Entry<String, String> entry : shardPosition.entrySet()) { PreparedStatement statement = getOptionalProperties().getEdiJdbc().getConnection().prepareStatement("insert into kinesis_shard_manager values(?, ?) on conflict on constraint pk_kinesis_shard_manager do update set sequence = excluded.sequence"); statement.setString(1, entry.getKey()); statement.setString(2, entry.getValue()); statement.executeUpdate(); statement.close(); getOptionalProperties().getEdiJdbc().getConnection().commit(); } // Set the previous position previousPosition = shardPosition; } } catch (Exception e) { throw new RuntimeException("Exception while handling the input", e); } } @Override public void teardown() { super.teardown(); // Disconnect from the edi jdbcStore this.optionalProperties.getEdiJdbc().disconnect(); } @Override public void setup(Context.OperatorContext context) { super.setup(context); try { // Connect to the Edi jdbcstore for this operator in the this.optionalProperties.getEdiJdbc().connect(); this.optionalProperties.getEdiJdbc().getConnection().setAutoCommit(false); } catch (Exception e) { throw new RuntimeException("Exception setting up the abstract edi processor: ", e); } } } From: Pradeep Dalvi [mailto:pradeep.da...@datatorrent.com] Sent: Tuesday, May 10, 2016 6:18 AM To: Jim <jim@facility.supplies<mailto:jim@facility.supplies>> Subject: Re: Problems with Kinesis Stream Hi Jim, I don't have much of background about Kinesis Operator. So I would coordinate with correct set of engineers and help you resolve issues on priority. I was with Chaitanya to try out different options. We also tried restarting the application. We would like to know little more details about scenarios, setup & RTS version that is being used. AFAIR from last conversation, we had suggested using DT RTS 3.3.0-RC5 Shall we schedule a call for this? Please feel free to suggest convenient timings. I shall check with Chaitanya. Thanks, Pradeep A. Dalvi On Tue, May 10, 2016 at 3:51 PM, Jim <jim@facility.supplies<mailto:jim@facility.supplies>> wrote: Hi Chaitanya, And you are adding new records, and see them flow in. And if you restart the application, and have it set to “earliest” it always retrieves all the records, then continues to get new ones as records are added? We had two different developers on two different machines get the same kind of errors. I will compare our applications to yours to see what could be different. Jim From: Chaitanya Chebolu [mailto:chaita...@datatorrent.com<mailto:chaita...@datatorrent.com>] Sent: Tuesday, May 10, 2016 5:17 AM To: users@apex.incubator.apache.org<mailto:users@apex.incubator.apache.org> Subject: Re: Problems with Kinesis Stream Hi JIm, I created sample application with the same dependencies which you have specified. I ran with earliest and latest offsets, it's working fine. I haven't observed any records loss in both the scenarios. We tried two different setups: local as well as on AWS. Please find the pom and application in the below location: https://gist.github.com/chaithu14/b7748af582e451eed8082995487f61d7 Regards, Chaitanya On Tue, May 10, 2016 at 1:18 PM, Jim <jim@facility.supplies<mailto:jim@facility.supplies>> wrote: Hello, I am using apex in an application, where one of the inputs is an AWS Kinesis stream. I am using this over AWS SQS because I need to guarantee that the items are processing in the order in which they are received by the system. I have an operator that uses the base AbstractKinesisInputOperator. We are currently using: Apex version 3.3.0-incubating Malhar version 3.3.1-incubating aws-java-sdk-kinesis version 1.10.76 I am noticing strange behavior of this operator and cannot pin down where the issue is coming from, if it is from the AWS sdk, or from the datatorrent apex module. Here is what I am seeing: 1.) When I setInitialOffset(“latest”); when I start the application, I do not see the next transactions always processed by the application. From what I can see it is very hit or miss, which transactions are actually read, and send through for processing. I have no idea why. 2.) When I setInitialOffset(“earliest”); it does seem to pick up all records, and read new records – for a while. Then at some point; it stops processing new records and even if I restart the application so it should start again from the beginning, nothing is read in. If I then delete and recreate the Kinesis stream it does work again for a while, till it stops again. Is anyone successfully using a Kinesis stream to process records, and are confident that no issues like this are occurring so transactions are not being missed? Can someone at apex/datatorrent look into this, and help figure out what is happening, and how we can fix this? Note that I am just about ready to go live, and would like to get this resolved this week! Thanks, Jim -- Pradeep A. Dalvi Software Engineer DataTorrent (India)