Saturday 12 March 2016

Composite Repository Pattern

In our current day the usage of Object-Relational Mappers (ORMs) in almost any data access application is highly prevalent.  Whilst popular choices like Entity Framework provide many powerful features and reduce the amount of code required to be written for data access they sometimes can bring problems when an optimal solution is required.

One of the pitfalls with Entity Framework which I find is that when I run a Linq query to get an entity using a primary key and then subsequently run that exact same query again the context will indeed produce two identical SQL statements and run the queries against my DBMS separately.

//query 1
var order1 = context.Orders.Single(order => order.OrderId == 1);

//query 2
var order2 = context.Orders.Single(order => order.OrderId == 1);


Upon the first query executing the order entity is cached internally within the framework.  Upon the second query executing another SQL statement is executed and the current order entity in the cache is updated.  The references of order1 and order2 would point to the same object in memory.  One might argue that the second query shouldn't execute a SQL statement as we simply retrieved the previous object and it's located in the internal cache - in other words the repository Orders doesn't have a caching mechanism where it can return the cached object by primary key instead of executing the same subsequent query.  This problem is very specific to the retrieval of ONE entity from the repository using the primary key.  As the lamba expression for querying the Orders set changes I can understand that executing a SQL statement is necessary.

A second pitfall I find frustrating is that Entity Framework isn't thread-safe, which means if I run the above two queries concurrently on two different threads then an exception will be thrown by the context.  Therefore if I wanted to have a multi-threaded application which ran Linq queries concurrently against my data source then I would have to scope the contexts to their own threads - probably making the context short-lived for each call.

Whilst many developers don't see these as big issues there are times when I needed to use the repository pattern for an aggregate root within my domain model because of these issues.

For a simplied aggregate root of Order:

    public class Order
    {
        public int OrderId { get; set; }
        public DateTime OrderDate { get; set; }
    }

My repository implementation would be as follows:

    public class OrderSqlRepository : IRepository<Order, int>
    {
        string _connStr;

        public OrderSqlRepository(string connStr)
        {
            _connStr = connStr;
        }

        public Order Get(int key)
        {
            using (SqlConnection conn = new SqlConnection(_connStr))
            {
                conn.Open();
                using (SqlCommand cmd = new SqlCommand("usp_GetOrder", conn))
                {
                    cmd.CommandType = System.Data.CommandType.StoredProcedure;
                    cmd.Parameters.AddWithValue("@OrderId", key);
                    using (SqlDataReader reader = cmd.ExecuteReader())
                    {
                        if (reader.Read())
                        {
                            Order order = new Order()
                            {
                                OrderId = Convert.ToInt32(reader["OrderId"]),
                                OrderDate = Convert.ToDateTime(reader["OrderDate"])
                            };
                            return order;
                        }
                    }
                }
            }

            return null;
        }

        public void Save(int key, Order entity)
        {
            using (SqlConnection conn = new SqlConnection(_connStr))
            {
                conn.Open();
                using (SqlCommand cmd = new SqlCommand("usp_SaveOrder", conn))
                {
                    cmd.CommandType = System.Data.CommandType.StoredProcedure;
                    cmd.Parameters.AddWithValue("@OrderId", key);
                    cmd.Parameters.AddWithValue("@OrderDate", entity.OrderDate);
                    cmd.ExecuteNonQuery();
                }
            }
        }
    }

Whilst the above repository implementation doesn't actually solve any of my problems I then wrap this above repository inside a cache repository which uses the same interface through use of a composite pattern.  Inevitably allowing me to cache entities within my own internal caching mechanism.

    public class CacheRepository<TEntity, TKey> : IRepository<TEntity, TKey>
        where TEntity : class
    {
        readonly IRepository<TEntity, TKey> _repository;
        readonly ConcurrentDictionary<string, object> _lockers = new ConcurrentDictionary<string, object>();
        readonly MemoryCache _cache;
        readonly CacheItemPolicy _cachePolicy;

        public CacheRepository(IRepository<TEntity, TKey> repository)
            : this(repository, new TimeSpan(0, 5, 0))
        {
        }

        public CacheRepository(IRepository<TEntity, TKey> repository
            , TimeSpan slidingEntityExpiration)
        {
            if (repository == null)
                throw new ArgumentNullException("repository");

            _repository = repository;
            _cache = MemoryCache.Default;

            _cachePolicy = new CacheItemPolicy()
            {
                SlidingExpiration = slidingEntityExpiration,
                RemovedCallback = entry =>
                {
                    /*
                     * when an object is removed from the cache it
                     * would be sensible to keep the locker collection
                     * in sync so that we don't hold unnecessary lock
                     * objects for entities which no longer exist
                     */
                    object removedEntity;
                    _lockers.TryRemove(entry.CacheItem.Key, out removedEntity);
                }
            };
        }

        void SetCachedEntity(string key, TEntity entity)
        {
            /*
             * if we're updating an entity in the cache
             * we want to remove the original first
             */
            _cache.Remove(key);
            if (entity != null)
                _cache.Add(key, entity, _cachePolicy);
        }

        TEntity GetCachedEntity(string key)
        {
            return (TEntity)_cache.Get(key);
        }

        object GetLocker(string key)
        {
            return _lockers.GetOrAdd(key, new object());
        }

        public virtual TEntity Get(TKey key)
        {
            if (key == null)
                throw new ArgumentNullException("key");

            string keystr = key.ToString();

            TEntity cachedEntity;
            lock (GetLocker(keystr))
            {
                //try and retrieve the entity from the local cache
                cachedEntity = GetCachedEntity(keystr);
                if (cachedEntity == null)
                {
                    //object doesn't exist - therefore try and load it from the underlying repository    
                    cachedEntity = _repository.Get(key);
                    if (cachedEntity != null)
                    {
                        //entity loaded successfully therefore set it in the local cache
                        SetCachedEntity(keystr, cachedEntity);
                    }
                }
            }

            return cachedEntity;
        }

        public void Save(TKey key, TEntity entity)
        {
            if (key == null)
                throw new ArgumentNullException("key");

            string keystr = key.ToString();

            lock (GetLocker(keystr))
            {
                /*
                 * try and persist the entity to the underlying
                 * repository prior to updating the local cache
                 */
                _repository.Save(key, entity);

                //save was successful therefore update the local cache
                SetCachedEntity(keystr, entity);
            }
        }
    }

Whilst both repository implementations above are thread-safe, we may run into a problem in the CacheRepository where we may end up modifying the same entity on two separate threads.  Take the following ordered thread execution as an example:

Thread 1 Thread 2
var order = repository.Get(1);
var order = repository.Get(1);
order.CustomerId = 1;
order.OrderItems.Add(new OrderItem());
repository.Save(order);

As you can see in the sequencing of the two threads above, an OrderItem is added to the same Order which is referenced in the two threads.  The execution then reverts to thread #1 where the order is then saved.  Thread #1 isn't expecting a new OrderItem to be included as part of the Order which is currently being modified - the only change which should be saved is the change of CustomerId.  This problem occurs because both threads retrieve the same instance of the object of Order #1 from the CacheRepository.

To resolve this issue we can override the Get method in a derived class (CloneCacheRepository) to clone the entity which is returned.  This means any changes made between entity retrieval and persistence is only visible on the current thread doing the modifications - no other thread can see these changes.  Upon the entity being saved in the CacheRepository the internal cache is updated with the modified entity - no override was required for this.

    public class CloneCacheRepository<TEntity, TKey> : CacheRepository<TEntity, TKey>
        where TEntity : class, IClone<TEntity>
    {
        public CloneCacheRepository(IRepository<TEntity, TKey> repository)
            : base(repository, new TimeSpan(0, 5, 0))
        {
        }

        public CloneCacheRepository(IRepository<TEntity, TKey> repository
            , TimeSpan slidingEntityExpiration)
            : base(repository, slidingEntityExpiration)
        {
        }

        public override TEntity Get(TKey key)
        {
            var entity = base.Get(key);
            return entity != null ? entity.DeepClone() : entity;
        }
    }

With the above implementation of the CloneCacheRepository class we now clone the entity which is returned - we implement IClone on the aggregate root entity to allow cloning.  This should be a deep clone of all entities which are related and could potentially be modified, otherwise a shallow clone would be sufficient.

When the same thread sequence occurs, the only change which is persisted is that to the CustomerId property.  Subsequently, if thread #2 tries to persist it's changes to the repository after thread #1 has completed it should throw an exception if concurrency checking is in place on the database server.

We have a few notable elements to outline in the above CacheRepository class.  We are able to add functionality to the repository without having to change an existing concrete implementation, this can be achieved by implementing a new class which derives from the same IRepository interface and then injecting other Repository classes during instantiation.  This is beneficial when adhering to SOLID design principles because this follows the open-closed principle whereby we're adding functionality by creating a new implementation which accepts an existing repository for which to wrap - we're not required to modify an existing class which would break this principle.

Although switching to this repository pattern forfeits the ability for us to use Linq it does allow us to use this repository in a multi-threaded environment and also minimises the number of underlying calls to the database as the cache is called prior to the underlying repository used for data access.  I have used this repository pattern in micro-services where speed is essential.

The full source code of the Composite Repository Pattern can be found at GitHub.