Reactive MongoDB with Spring boot

MongoDB has changed a lot over the last few years. I have not used it for more than eight years in production now. I recently need to use MongoDB as a multi-purpose data store. It surprised me a lot with the new features, but I was slightly disappointed with the problems of read and write concerns at default settings.  However, you can set it to a majority or a higher level to achieve casual consistency.

In this post we are going to cover how to create a spring boot service, connect to Mongo DB with a reactive driver, and perform some queries. I used the freemium Mongo Atlas by signing up and set up a simple cluster on AWS.

It is easy to sign up and we can load sample datasets as well. It may take a while to set up the data. Once it is done, head to start.spring.io, add webflux, reactive mongo, and generate the code. If you want to use Lombok, please add that as well.

Part 1 – Basics

First, we will look at the ReactiveMongoRepository, which is extended from ReactiveCrudRepository. If you’re already familiar with the repository pattern and used spring-data before, then there is nothing new here.

The sample data used in this blog post are loaded from Mongo Atlas and I am using the sample_analytics database. It contains three collections accounts, customers, and transactions. First, create a model for the Customer and a repository for the same. And then the service and controller for the customer.


@Data
@Document("customers")
public class Customer {
@Id
private ObjectId id;

private String username;

private String name;

private String address;

private String email;

private Date birthdate;

private Boolean active;

private List<Integer> accounts;

}

Now access the endpoint at `/customer/fmiller` or use any username from the sample data. ReactiveMongoRepository internally uses ReactiveMongoTemplate (which we are going to see in a bit) fires the find query { “username” : “fmiller”} fields: Document{{}} and results in fetching the customer document for us.

Note: If you’re unable to run the application there are a couple of things you may have to set. Starting JDK 11, the default TLS version is 1.3 which seems to have known issues preventing establishing the SSL handshake. To prevent this set the TLS version to 1.2 via JVM arguments -Djdk.tls.client.protocols=TLSv1.2. Second, if you’re using Lombok and run into issues, enable annotation processing (both Eclipse and IntelliJ) to run the application.

It is very simple to use the repository methods to build simple queries, when you want to build complex queries and use MongoDB’s built-in features we must use MongoTemplate. In our case, since we are using the reactive version we will use ReactiveMongoTemplate. It is a concrete implementation of the ReactiveMongoOperations interface. For the demonstration purpose, we are going to use the Transactions collections which are an aggregation of various transactions (Stock market) per account. So, we are going to have to Transactions and Transaction model. One will serve as an entity object and the other is a POJO for an individual transaction.

// Transactions.java

@Data
@Document("transactions")
public class Transactions {

  @Id
  private ObjectId id;

  @Field(value = "account_id")
  private Integer accountId;

  private Integer transaction_count;

  @Field(value = "bucket_start_date")
  private Date bucketStartDate;

  @Field(value = "bucket_end_date")
  private Date bucketEndDate;

  private List<Transaction> transactions;

}
//Transaction.java
public class Transaction {
  private Integer amount;
  private Date date;
  private String symbol;
  @Field(value = "transaction_code")
  private String transactionCode;
  private BigDecimal price;
  private BigDecimal total;
}

Part 2 – Querying

It is obvious we will have a TransactionRepository interface but we are going to have a custom repository using the ReactiveMongoTemplate which is an important piece.

public interface TransactionRepository
extends ReactiveMongoRepository<Transactions, String>, CustomTransactionRepository {}

We need to create a ReactiveMongoTemplate bean first using the MongoClient.  If you notice I am setting the write concern as the majority for all the documents for the same reason I mentioned at the beginning of the post. MongoDB defaults are very weak, and you might lose your data if it is not properly configured.

@Bean
public ReactiveMongoTemplate reactiveMongoTemplate() {
ReactiveMongoTemplate reactiveMongoTemplate = new ReactiveMongoTemplate(mongoClient, "sample_analytics");
reactiveMongoTemplate.setWriteConcern(WriteConcern.MAJORITY);
return reactiveMongoTemplate;
}

Starting MongoDB 4.4, you can also set this via the setDefaultRWConcern command like below.

 db.adminCommand(
   {
     setDefaultRWConcern : 1,
     defaultReadConcern: { <read concern> },
     defaultWriteConcern: { <write concern> },
     writeConcern: { <write concern> },
     comment: <any>
   }
 ) 

Next, we will create a CustomTransactionRepository interface which has three methods and their implementation.

public interface CustomTransactionRepository {
  Flux<Transactions> getPagedTransactions(int page, int num);
  Flux<Transactions> getTransactionByAccountIds(List<Integer> accountId);
  Flux<Transaction> getTransactionsWithFilters(List<Integer> accountId, String movement);
}

public class CustomTransactionRepositoryImpl implements CustomTransactionRepository {
  @Autowired ReactiveMongoTemplate mongoTemplate;

  @Override public Flux<Transactions> getPagedTransactions(int page, int num) {
    return mongoTemplate.findAll(Transactions.class)
        .skip(page * num)
        .limitRequest(num);
  }
}

I created a simple method with a paginated list of transactions, and it uses the built -n methods skip and limit. Let’s iterate a bit more and try something better. We are going to get the list of transactions for a customer. Remember, MongoDB is a document database, and the data are de-normalized. We can achieve the RDBMS equivalent of join in two ways. First querying the customer collection and get the account ids and filter them from the transactions collection. Another way is, via a special command called $lookup similar to a left outer join, which has some performance issues and limitations. For example, you can’t use it on collections that are sharded. MongoDB recommends considering an embedded data model instead.

Start by adding the below method to the repository implementation and create a transaction service that is going to get the list of accounts a customer holds and pass it to the transactions to filter. This is nothing but a fluent API provided by ReactiveFluentMongoOperations.

@Override 
public Flux<Transactions> getTransactionByAccountIds(List<Integer> accountIds) {
  return mongoTemplate.query(Transactions.class)
      .matching(query(where("account_id").in(accountIds)))
      .all();
}

Part 3 – Aggregation

Next, we are going to see how to use aggregation. Aggregation in MongoDB is like a pipeline contains different stages. Each stage receives a set of documents, process it and produces a cursor or a collection. Check the complete set of available stages and operators here.

We are going to find the transactions fir a customer across the accounts and filter by buy or sell trades. It has to executed in stages which are,

  1. Match the account ids for a customer and filter them from the collection
  2. Unwind the transactions object to deconstruct the array of transactions
  3. Match the transaction code which is either buy or sell
  4. Finally, project the results with the required fields

The Mongo shell command or query will look like this.

 {
   "aggregate": "transactions",
   "pipeline": [
     {
       "$match": {
         "account_id": {
           "$in": [
             371138
           ]
         }
       }
     },
     {
       "$unwind": "$transactions"
     },
     {
       "$match": {
         "transactions.transaction_code": "sell"
       }
     },
     {
       "$project": {
         "date": "$transactions.date",
         "amount": "$transactions.amount",
         "transaction_code": "$transactions.transaction_code",
         "symbol": "$transactions.symbol",
         "price": "$transactions.price",
         "total": "$transactions.total"
       }
     }
   ]
 } 

ReactiveMongoTemplate has full support of the aggregation functions, hence it is quite easier to transform in this to an aggregation pipeline in our service.

@Override
public Flux<Transaction> getTransactionsWithFilters(List<Integer> accountIds, String movement) {
  Aggregation transactionAggregation = newAggregation(
      match(where("account_id").in(accountIds)),
      unwind("transactions"),
      match(where("transactions.transaction_code").is(movement)),
      project("transactions.date", "transactions.amount", "transactions.transaction_code", "transactions.symbol", "transactions.price", "transactions.total")
  );

  Flux<Transaction>
      results = mongoTemplate.aggregate(transactionAggregation, "transactions", Transaction.class);
  return results;
}

As always, the entire code is available on Github. Happy Coding.

References

https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/

https://docs.mongodb.com/manual/reference/command/setDefaultRWConcern/

https://docs.mongodb.com/manual/core/transactions/

http://jepsen.io/analyses/mongodb-4.2.6

https://docs.mongodb.com/manual/core/causal-consistency-read-write-concerns/

Leave a Reply