Saturday 12 March 2016

Composite Repository Pattern

In our current day the usage of Object-Relational Mappers (ORMs) in almost any data access application is highly prevalent.  Whilst popular choices like Entity Framework provide many powerful features and reduce the amount of code required to be written for data access they sometimes can bring problems when an optimal solution is required.

One of the pitfalls with Entity Framework which I find is that when I run a Linq query to get an entity using a primary key and then subsequently run that exact same query again the context will indeed produce two identical SQL statements and run the queries against my DBMS separately.

//query 1
var order1 = context.Orders.Single(order => order.OrderId == 1);

//query 2
var order2 = context.Orders.Single(order => order.OrderId == 1);


Upon the first query executing the order entity is cached internally within the framework.  Upon the second query executing another SQL statement is executed and the current order entity in the cache is updated.  The references of order1 and order2 would point to the same object in memory.  One might argue that the second query shouldn't execute a SQL statement as we simply retrieved the previous object and it's located in the internal cache - in other words the repository Orders doesn't have a caching mechanism where it can return the cached object by primary key instead of executing the same subsequent query.  This problem is very specific to the retrieval of ONE entity from the repository using the primary key.  As the lamba expression for querying the Orders set changes I can understand that executing a SQL statement is necessary.

A second pitfall I find frustrating is that Entity Framework isn't thread-safe, which means if I run the above two queries concurrently on two different threads then an exception will be thrown by the context.  Therefore if I wanted to have a multi-threaded application which ran Linq queries concurrently against my data source then I would have to scope the contexts to their own threads - probably making the context short-lived for each call.

Whilst many developers don't see these as big issues there are times when I needed to use the repository pattern for an aggregate root within my domain model because of these issues.

For a simplied aggregate root of Order:

    public class Order
    {
        public int OrderId { get; set; }
        public DateTime OrderDate { get; set; }
    }

My repository implementation would be as follows:

    public class OrderSqlRepository : IRepository<Order, int>
    {
        string _connStr;

        public OrderSqlRepository(string connStr)
        {
            _connStr = connStr;
        }

        public Order Get(int key)
        {
            using (SqlConnection conn = new SqlConnection(_connStr))
            {
                conn.Open();
                using (SqlCommand cmd = new SqlCommand("usp_GetOrder", conn))
                {
                    cmd.CommandType = System.Data.CommandType.StoredProcedure;
                    cmd.Parameters.AddWithValue("@OrderId", key);
                    using (SqlDataReader reader = cmd.ExecuteReader())
                    {
                        if (reader.Read())
                        {
                            Order order = new Order()
                            {
                                OrderId = Convert.ToInt32(reader["OrderId"]),
                                OrderDate = Convert.ToDateTime(reader["OrderDate"])
                            };
                            return order;
                        }
                    }
                }
            }

            return null;
        }

        public void Save(int key, Order entity)
        {
            using (SqlConnection conn = new SqlConnection(_connStr))
            {
                conn.Open();
                using (SqlCommand cmd = new SqlCommand("usp_SaveOrder", conn))
                {
                    cmd.CommandType = System.Data.CommandType.StoredProcedure;
                    cmd.Parameters.AddWithValue("@OrderId", key);
                    cmd.Parameters.AddWithValue("@OrderDate", entity.OrderDate);
                    cmd.ExecuteNonQuery();
                }
            }
        }
    }

Whilst the above repository implementation doesn't actually solve any of my problems I then wrap this above repository inside a cache repository which uses the same interface through use of a composite pattern.  Inevitably allowing me to cache entities within my own internal caching mechanism.

    public class CacheRepository<TEntity, TKey> : IRepository<TEntity, TKey>
        where TEntity : class
    {
        readonly IRepository<TEntity, TKey> _repository;
        readonly ConcurrentDictionary<string, object> _lockers = new ConcurrentDictionary<string, object>();
        readonly MemoryCache _cache;
        readonly CacheItemPolicy _cachePolicy;

        public CacheRepository(IRepository<TEntity, TKey> repository)
            : this(repository, new TimeSpan(0, 5, 0))
        {
        }

        public CacheRepository(IRepository<TEntity, TKey> repository
            , TimeSpan slidingEntityExpiration)
        {
            if (repository == null)
                throw new ArgumentNullException("repository");

            _repository = repository;
            _cache = MemoryCache.Default;

            _cachePolicy = new CacheItemPolicy()
            {
                SlidingExpiration = slidingEntityExpiration,
                RemovedCallback = entry =>
                {
                    /*
                     * when an object is removed from the cache it
                     * would be sensible to keep the locker collection
                     * in sync so that we don't hold unnecessary lock
                     * objects for entities which no longer exist
                     */
                    object removedEntity;
                    _lockers.TryRemove(entry.CacheItem.Key, out removedEntity);
                }
            };
        }

        void SetCachedEntity(string key, TEntity entity)
        {
            /*
             * if we're updating an entity in the cache
             * we want to remove the original first
             */
            _cache.Remove(key);
            if (entity != null)
                _cache.Add(key, entity, _cachePolicy);
        }

        TEntity GetCachedEntity(string key)
        {
            return (TEntity)_cache.Get(key);
        }

        object GetLocker(string key)
        {
            return _lockers.GetOrAdd(key, new object());
        }

        public virtual TEntity Get(TKey key)
        {
            if (key == null)
                throw new ArgumentNullException("key");

            string keystr = key.ToString();

            TEntity cachedEntity;
            lock (GetLocker(keystr))
            {
                //try and retrieve the entity from the local cache
                cachedEntity = GetCachedEntity(keystr);
                if (cachedEntity == null)
                {
                    //object doesn't exist - therefore try and load it from the underlying repository    
                    cachedEntity = _repository.Get(key);
                    if (cachedEntity != null)
                    {
                        //entity loaded successfully therefore set it in the local cache
                        SetCachedEntity(keystr, cachedEntity);
                    }
                }
            }

            return cachedEntity;
        }

        public void Save(TKey key, TEntity entity)
        {
            if (key == null)
                throw new ArgumentNullException("key");

            string keystr = key.ToString();

            lock (GetLocker(keystr))
            {
                /*
                 * try and persist the entity to the underlying
                 * repository prior to updating the local cache
                 */
                _repository.Save(key, entity);

                //save was successful therefore update the local cache
                SetCachedEntity(keystr, entity);
            }
        }
    }

Whilst both repository implementations above are thread-safe, we may run into a problem in the CacheRepository where we may end up modifying the same entity on two separate threads.  Take the following ordered thread execution as an example:

Thread 1 Thread 2
var order = repository.Get(1);
var order = repository.Get(1);
order.CustomerId = 1;
order.OrderItems.Add(new OrderItem());
repository.Save(order);

As you can see in the sequencing of the two threads above, an OrderItem is added to the same Order which is referenced in the two threads.  The execution then reverts to thread #1 where the order is then saved.  Thread #1 isn't expecting a new OrderItem to be included as part of the Order which is currently being modified - the only change which should be saved is the change of CustomerId.  This problem occurs because both threads retrieve the same instance of the object of Order #1 from the CacheRepository.

To resolve this issue we can override the Get method in a derived class (CloneCacheRepository) to clone the entity which is returned.  This means any changes made between entity retrieval and persistence is only visible on the current thread doing the modifications - no other thread can see these changes.  Upon the entity being saved in the CacheRepository the internal cache is updated with the modified entity - no override was required for this.

    public class CloneCacheRepository<TEntity, TKey> : CacheRepository<TEntity, TKey>
        where TEntity : class, IClone<TEntity>
    {
        public CloneCacheRepository(IRepository<TEntity, TKey> repository)
            : base(repository, new TimeSpan(0, 5, 0))
        {
        }

        public CloneCacheRepository(IRepository<TEntity, TKey> repository
            , TimeSpan slidingEntityExpiration)
            : base(repository, slidingEntityExpiration)
        {
        }

        public override TEntity Get(TKey key)
        {
            var entity = base.Get(key);
            return entity != null ? entity.DeepClone() : entity;
        }
    }

With the above implementation of the CloneCacheRepository class we now clone the entity which is returned - we implement IClone on the aggregate root entity to allow cloning.  This should be a deep clone of all entities which are related and could potentially be modified, otherwise a shallow clone would be sufficient.

When the same thread sequence occurs, the only change which is persisted is that to the CustomerId property.  Subsequently, if thread #2 tries to persist it's changes to the repository after thread #1 has completed it should throw an exception if concurrency checking is in place on the database server.

We have a few notable elements to outline in the above CacheRepository class.  We are able to add functionality to the repository without having to change an existing concrete implementation, this can be achieved by implementing a new class which derives from the same IRepository interface and then injecting other Repository classes during instantiation.  This is beneficial when adhering to SOLID design principles because this follows the open-closed principle whereby we're adding functionality by creating a new implementation which accepts an existing repository for which to wrap - we're not required to modify an existing class which would break this principle.

Although switching to this repository pattern forfeits the ability for us to use Linq it does allow us to use this repository in a multi-threaded environment and also minimises the number of underlying calls to the database as the cache is called prior to the underlying repository used for data access.  I have used this repository pattern in micro-services where speed is essential.

The full source code of the Composite Repository Pattern can be found at GitHub.

Tuesday 9 February 2016

Retry Pattern for failed Invocations

There are numerous times when invoking a delegate may fail due to an unexpected exception occurring in which we would like to automatically retry the invocation up to a certain number of retries.  This can easily be achieved using the Retry Pattern as shown below:

public static bool TryInvoke<TException, T>(Func<T> del, out T value, out IList<Exception> exceptions, int attempts = 3, IInterval interval = null)
 where TException : Exception
{
 if (del == null) throw new ArgumentNullException("del");
 if (attempts <= 0) throw new ArgumentOutOfRangeException("attempts");

 exceptions = new List<Exception>();

 for (int i = 0; i < attempts; i++)
 {
  try
  {
   value = del();
   return true;
  }
  catch (TException ex)
  {
   exceptions.Add(ex);
   if (interval != null)
    interval.Sleep();
  }
 }

 value = default(T);
 return false;
}

The above generic implementation of the Retry Pattern also follows the structure of the Try-Parse Pattern in that if the invocation of the delegate fails on all of it's attempts then the "out" parameter for the return value of the delegate is set to it's default value and the method returns false.  Conversely, if the delegate succeeds then the return value of the delegate is assigned to the "out" parameter and returns true - this means we can invoke delegates as follows:

int value;
Func<int> calculateValue = null;
if (Invoker.TryInvoke<int>(calculateValue, out value))
{
 //print value
 Console.WriteLine("Value: {0}", value);
}
else
{
 //show error message to user
 Console.WriteLine("Error calculating value after X attempts");
}

It is also possible for us to gracefully retry on specific types of exceptions and throw exceptions on others.  For example, I may expect a failed attempt whilst downloading some data from an external source, therefore I may want to retry ONLY when a DownloadException is thrown, all other exceptions should cause an unhandled exception to be thrown:

string data;
Func<string> getData = null;
IList<Exception> exceptions;
try
{
 if (Invoker.TryInvoke<DownloadException, string>(getData, out data, out exceptions))
 {
  //succeeded
 }
 else
 {
  //gracefully handle exceptions
 }
}
catch (Exception ex)
{
 //unexpected exception
}

We may also want to run this logic asynchronously, for example when a user clicks a button on a GUI and requests to download some data from an external source:

Func<string> downloadData = () =>
{
 return new Downloader().Download();
};
var task = await Invoker.TryInvokeAsync<string>(downloadData);
if (task.Success)
{
 //invocation passed
 Console.WriteLine(task.Value);
}

Note how the syntax of the TryInvokeAsync method has changed slightly in that it no longer contains an out parameter for the assigned value - async methods are unable to support out parameters.

The complete source code of the Retry Invoker can be found on GitHub.