duplicate-event-processing

Distributed Method Mutex – How To Prevent Duplicate Event Processing At Scale

Matt Watson Insights for Dev Managers Leave a Comment

Distributed Method Mutexing

Stackify handles millions of incoming metrics and log messages on a daily basis. This creates some unique challenges to coordinating various types of events that are triggered as data flows in and system settings are modified. When events are constantly firing for various reasons across multiple servers it can be very challenging to ensure that certain types of events do not happen more than once (concurrency) and more than every so often (frequency). This blog post discusses how we coordinate and handle all of these events across multiple servers. Source code for this solution is on GitHub.

No worries, no SPAM. Opt-out anytime.

Problem #1: Concurrency – Doing the same thing multiple times at once

A simple example of this at Stackify is creating indexes within Elasticsearch. It would never make sense if multiple processes were trying to create the same index at the same time. One would succeed and all the rest would throw an error. We normally create indexes in advance, but if an index is missing our system will automatically create it. (This scenario could happen for a brand new client who just signed up today and we didn’t pre-generate the indexes the day before.) Our solution as described below prevents the barrage of exceptions that would occur if all the servers tried to create the index concurrently.

How to prevent duplicate event processing

To solve this problem we wrap the methods that we want to enforce concurrency limits on with special logic that uses cache lookups and cache based key locking. By adding this method attribute, every time this method is called we do a cache lookup to Redis to see if the method is already being called currently. If it is being executed currently across any server, we don’t allow it to run again. We use PostSharp to create a method attribute that we can apply to any method very easily. When we compile our code PostSharp does some special magic to apply the logic to every method with the attribute.

  [MethodMutex()] 
        private bool DoSomething(int clientID)
        {
/* Code goes here */
            return true;
        }

PostSharp allows us to do aspect oriented programming which basically wraps the method call in some additional logic without having to do a lot of programming for each method. All we have to do is apply the attribute to the method.

OK, so how does it work?

Most caching systems provide locking mechanisms so that you can get an exclusive lock on a cache key to update it. This capability works perfectly as a distributed locking mechanism for method mutexes. Previously we did this with Azure Managed Cached which is basically the same as AppFabric. We have recently switched to Redis and implemented locking with it, you can read more about cache tagging with Redis in this blog post.

The implementation of this varies from one cache client to another but the logic should be the same. The object you actually cache should be a simple object used to track the last time the method was executed and any other custom properties you want to track. The key should be made up of the method name and unique method parameters. That way you can call the same method with different parameters and it can support mutexing enforcement based off the actual method parameters and not just the method itself. It’s a beautiful thing.

When you do a “get and lock” type call to the cache you can also potentially use a timeout to wait for the current lock to expire if your caching provider allows that. This could be used to potentially wait until the current execution of the method is complete and then run the method again. Note that this would block the code waiting so it would be a bad idea in UI code.

Using cache locking around a method in this way does have some overhead. It requires a cache lookup every single time the method is called. So think about how often this method gets called and the performance impact before implementing this.

View the entire working sample on GitHub. Please note that our code uses the ServiceStack library for Redis.

  public override void OnInvoke(MethodInterceptionArgs args)  
        {
            string key = DeriveCacheKey(args);
            _redisConnStr = ConfigurationManager.AppSettings["redisConnStr"];

            LockResult<CacheObj> lockResult = new LockResult<CacheObj>();

            try
            {
                using (var client = new RedisClient(_redisConnStr))
                {
                    lockResult = TryGetLock<CacheObj>(client, key, _maxMethodLockTime, TimeSpan.FromSeconds(2));

                    if (!lockResult.Acquired)
                    {
                        ReturnWithoutRunning(args);
                    }
                    else
                    {
                        RunMethod(args);
                    }

                    //clear the lock
                    if (lockResult.Handle != null)
                    {
                        lockResult.Handle.Handle.Dispose();
                    }
                }
            }
            catch (Exception ex)
            {
                Debug.WriteLine(ex.ToString());
                ReturnWithoutRunning(args);
            }
        }

Problem #2: Frequency – Limiting how often we run a certain piece of code

 In the previous example, we talked about limiting the same code from firing more than once at the exact same time. Another important use case is limiting the frequency a method can be called, not just concurrently.

 A good example of this would be a job that summarizes some data and you want to do it only once a minute. As data comes in you could call a summarize method that utilizes a method attribute that would restrict this. This is very similar to the plain MethodMutex except it also limits calling a method if it has already been called within so many seconds or minutes.

 The performance impact of this is actually lower than the plain MethodMutex solution we covered first. We also use local memory cache to enforce the frequency. We can query Redis and see if the method is currently running and when it is allowed to be run again. That can be cached in local memory if we know that the method can’t be run again for a few seconds. This reduces the Redis queries and is more performant depending on the usage.

 View code for MethodMutexWithFrequency on GitHub

       [MethodMutexWithFrequency(60)]

       private bool SummarizeData(int clientID)

       {

/* Code goes here */

           return true;

       }

Processing large volumes of events across many distributed servers creates its own set of challenges. Queueing can help with durability of the messages, but enforcing concurrency and frequency around event processing is not solved by queues alone. Share with us the solutions you’ve found to these problems!

 

Photo credit: Sam UL

About Matt Watson

Matt is the Founder & CEO of Stackify. He has been a developer/hacker for over 15 years and loves solving hard problems with code. While working in IT management he realized how much of his time was wasted trying to put out production fires without the right tools. He founded Stackify in 2012 to create an easy to use set of tools for developers.