In part 1 and 2 of this series we learned how to build a multiplatform mobile application with two-way data sync. Today we are going to see how to take advantage of asynchronous queues within RhoConnect to improve our response times and resource usage.


By the end of part 2, every user of our mobile application was synchronizing locally-created data with our RhoConnect server: when you saved a search on your mobile device, every other user would also get it on theirs. To do that, we implemented the “search” source adapter, to link mobile users with our sample SQLite database and allow for the basic CRUD operations. We were not communicating with Twitter to actually perform the searches that users wanted; now is the time to fix that.


When and how do we communicate with an external server and keep our data up to date? As always, we have several alternatives, some better than others:


The first option that comes to mind is, inside the “tweet” source adapter. Remember that a source adapter is the piece of code we need to write to glue our RhoConnect server together with any external data source. It seems like a reasonable place for our tweet-fetching code but there is an important caveat to keep in mind: in this code, we are going to be talking to an external server over the network and that entails high latency and potential for errors. Mobile clients would be waiting for our RhoConnect server to make a request to Twitter and receive the reply, which is generally a bad idea, as we want all our processes to be as quick as possible.


Can we decouple data retrieval from the external server and data synchronization with our mobile clients?


In our RhoConnect instance, we could start a new thread to get the information from the remote data source at defined intervals and store the results locally. All our source adapter would need to do then is get the data from our local cache and send it to the mobile client. What would happen if this task, instead of being network-bound, required heavy CPU usage? When our usage grows, we may find the need to offload it to a different machine in order to save resources on our main server.


Resque to the rescue


RhoConnect comes integrated with Resque, a library for distributed asynchronous job execution. The way it works is, you send jobs to a queue and, somewhere, on the same or a different machine, there is a set of worker processes that scan the queue and run the jobs as they come. This sounds ideal for our use case: we can run the worker processes on the same machine as our RhoConnect instance (the default) and when we start to get a lot of traffic, we will move these workers to a different server, nothing else will be affected. If the server with the workers goes down, the rest of our application will still work uninterrupted.


Where do we start?

The first thing we need is code to retrieve tweets from Twitter. In our tweetserver application, let’s create a “lib” folder and, within it, a file called “tweetfetcher.rb” with the following content:


require 'open-uri'
require 'json'

class TweetFetcher
  # Jobs must tell resque which queue they must be placed in
  def self.queue

  # The perform method is where the job gets its work done
  def self.perform()

    # get all searches from our local database
    search_rows = Application::database.execute "Select id,query from Searches"

    search_rows.each do |search|
        search_query = URI::escape(search["query"])

        # Search twitter for our query terms - this is the 1.0 API, which is deprecated now but still works as of this writing
        # It has the advantage of not requiring authentication, making our code very simple
        uri = URI("{search_query}&rpp=5&include_entities=false&result_type=mixed")

        search_api_results =

        json_results = JSON.parse(search_api_results)

        # Get the tweets from the response...
        twitter_results = json_results["results"]

        tweets_for_search = {}

        #... and build a hash of hashes in the format that the RhoConnect source adapter expects:
        # {
        #  "PRIMARY KEY 1" => { :property1 => :value1, :property2 => :value2 },
        #  "PRIMARY KEY 2" => { :property1 => :value1, :property2 => :value2 }
        # }

        twitter_results.each do |tweet|
          tweets_for_search[tweet["id_str"]] = { :search_id => search["id"], :status => tweet["text"]}

        tweets_for_search_key = "tweets_for_search:#{search["id"]}"
        # Save our hash of hashes to Redis so that the "tweet" source adapter can find it later
        Store.put_data(tweets_for_search_key, tweets_for_search)
      rescue Exception => e
        # an error occurred - there's not much else we can do about this right now
        p "Error: #{e}"

In this code, we are fetching tweets from the Twitter Search API and building a hash of hashes that the “tweet” source adapter can then return directly to mobile clients. We could also have stored just the raw results from the HTTP request, and have the source adapter parse them but by doing it this way, we are offloading any CPU-heavy processing to the worker. As we reasoned earlier, this will come handy if we have to move to a multi-server setup in the future.


The next step is to get this job into the queue so that it executes every now and then. Open application.rb and add the following: at the top,



require_relative "lib/tweetfetcher.rb"



and near the end, after the database method:



    def before_run {
        while 1
            # Is our tweet fetching job already enqueued?
            existing_job = Resque.peek(TweetFetcher::queue)

            # If not, add it to the queue
            unless existing_job
            # We will fetch tweets approximately once every 5 minutes and sleep peacefully the rest of the time
            sleep 5.minutes

We spawn a new thread in an infinite loop that looks at the queue and, if there is not a TweetFetcher job awaiting execution, it enqueues one.


When does this “before_run” method run? By default, never, because this is something we just made up, we need to invoke it somehow. Open and look around line 16:




# Load RhoConnect application
require './application'


# run RhoConnect Application



Just add a call to “Application.before_run” between the “require ./application” and “run” calls.


Now we are ready to start the queue: open a console and run the following command:



QUEUE=tweetfetcher rake resque:work



Now the queue is online and there is a worker running jobs as they come. We can connect to redis and check that our data is there:



$ redis-cli keys tweets_for_search*



Now that we are getting the tweets, we need to forward them to the mobile app when it synchronizes with the server. That wiill be done in the “tweet” source adapter (sources/tweet.rb) and, in particular, its query method, which we must update as follows:


  def query(params=nil)
    @result = {}
    search_rows = Application::database.execute("Select id,query from Searches")
    search_rows.each do |search|
      search_id = search["id"]
      tweets_redis_key = "tweets_for_search:#{search_id}"
      tweets_for_search = Store.get_data(tweets_redis_key)


Pretty simple, right? Just see what searches there are in our database and extract from Redis the results of our TweetFetcher job.


Our work is complete now, start the RhoConnect server with


$ rhoconnect start


and run the mobile application: all users will be synchronizing tweets from the server, automatically and efficiently.


Note that, while we completely changed the implementation of the source adapter (we were previously returning a hardcoded hash before and now we have this whole setup of distributed network requests), we did not touch the mobile application, which has continued working unchanged, oblivous to all the changes in the backend.


About Us:


Kutir Mobility is your partner in the enterprise mobile app space, empowering your team with our custom training sessions and complementing it with our own in-house development expertise. Get in touch today to schedule a free consultation with one of our mobile architects.