Welcome!

Apache Authors: Elizabeth White, Pat Romanski, Liz McMillan, Christopher Harrold, Janakiram MSV

Blog Feed Post

Quick MapReduce with beanstalkd

At ProjectLocker, we operate a polyglot environment with a heavy Ruby bias. While we love Ruby and Rails, one of the drawbacks of Ruby is its Global VM Lock. In a nutshell, the Global VM Lock makes it harder to write Ruby code that can fully utilize a modern multi-core server. For Web applications, this isn’t a problem because the web server manages multiple processes for you (e.g. via Passenger). However, for offline processes, parallelism doesn’t come for free.

I was recently working on a project that involved the offline batch processing of lots of data. This project has been operating successfully for some time, but the data set has grown, causing the process to need more more time to complete than we’d like. So I dove in to see what we could do to speed it up. Fortunately, the process was still single-threaded, so we knew we’d be able to inject concurrency to increase throughput without adding hardware.

The job in question runs on a fairly well-equipped server, but the server was underutilized due to the process being serial. Here’s an outline of the initial code:

def main_job
  for retrieve_giant_dataset().each do |item|
    long_process(item)
  end

  summarize_results(retrieve_all_results()) 
end

def long_process(item)
  # Do some work on item that uses a lot of CPU time.
  item.save
end

That approach gets the job done, but I wanted to parallelize it. Conceptually, I wanted to transform the main_job method so that it looked something like this:

def main_job
  threads = []
  for retrieve_giant_dataset().each do |item|
    threads << Thread.new(item) do
      long_process(item)
    end
  end

  threads.each { |t| t.join }

  summarize_results(retrieve_all_results()) 
end

Unfortunately, it’s not that easy due to the aforementioned Global VM Lock. What I needed was a way to get my threads running on a bunch of independent processes. This is a problem tailor-made for a job queueing system. Enter beanstalkd, a simple & fast work queue. We paired beanstalkd with Stalker, a DSL that makes it easy to queue and process jobs from Ruby. Integrating these two was a cinch. Here’s what the restructured code looks like now:

def main_job
  for retrieve_giant_dataset().each do |item|
    Stalker.enqueue(JOB_NAME, :id => item.id)
  end

  beaneater = Beaneater::Pool.new(['localhost:11300'])
  tube = beaneater.tubes.find TUBE_NAME
  while tube.peek(:ready)
    sleep(5)
  end

  summarize_results(retrieve_all_results()) 
end

So instead of processing each item during the loop, now we just add each to the beanstalkd queue. Once we finish queueing all of the items, we wait until all of our entries have been processed by the worker processes. The workers are initiated via a jobs.rb file that looks something like this:

include Stalker
  
job JOB_NAME do |args|
  item = ItemClass.find(args['id'])
  Worker::long_process(item) 
end

We then start beanstalkd and a few worker processes and we’re off to the races. Now our job runs in parallel via multiple processes, and we can tune the number of worker processes we run to consume as much of the machine’s resources as we like. As a bonus, we can also run Stalker workers on other machines in our cluster for added parallelism. With just a few minor tweaks to our code, we’ve gone from single-threaded to a solution that is limited only by the capacity of the shared database used. Sweet!

What about the MapReduce reference in the title of this post? The MapReduce algorithm basically has two steps. In the Map step, you divide the work and assign it to worker nodes. The Reduce step simply combines the results of each individual node’s computation into an aggregate result. In our solution here, the Map step is done by us enqueuing our jobs into beanstalkd and then beanstalkd making the jobs available for consumption by our nodes. Our database serves to communicate the details of the jobs, and stands in for a shared filesystem like the HDFS used by Hadoop. I didn’t go into detail about this step, but our Reduce is also assisted by database aggregates; we’re able to construct a few simple queries that get us what we want from the database.

So there it is, distributed MapReduce for Ruby using beanstalkd, Stalker, and a healthy database. This is probably not the best solution if you need to scale to thousands or tens of thousands of workers. But if you just need to get tens of workers running in parallel quickly, you may be able to adapt this approach to fit your needs.

Read the original blog entry...

More Stories By Damon Young

Damon Young is Director of Sales at ProjectLocker.com. ProjectLocker was founded in 2003 to provide on-demand tools for software developers. Guided by the simple mission of helping companies build better software, ProjectLocker's services have expanded to include services for the complete lifecycle of software projects, from requirements documentation to build and test automation. ProjectLocker serves companies from startups to Fortune 1000 multinationals.

IoT & Smart Cities Stories
The deluge of IoT sensor data collected from connected devices and the powerful AI required to make that data actionable are giving rise to a hybrid ecosystem in which cloud, on-prem and edge processes become interweaved. Attendees will learn how emerging composable infrastructure solutions deliver the adaptive architecture needed to manage this new data reality. Machine learning algorithms can better anticipate data storms and automate resources to support surges, including fully scalable GPU-c...
Machine learning has taken residence at our cities' cores and now we can finally have "smart cities." Cities are a collection of buildings made to provide the structure and safety necessary for people to function, create and survive. Buildings are a pool of ever-changing performance data from large automated systems such as heating and cooling to the people that live and work within them. Through machine learning, buildings can optimize performance, reduce costs, and improve occupant comfort by ...
The explosion of new web/cloud/IoT-based applications and the data they generate are transforming our world right before our eyes. In this rush to adopt these new technologies, organizations are often ignoring fundamental questions concerning who owns the data and failing to ask for permission to conduct invasive surveillance of their customers. Organizations that are not transparent about how their systems gather data telemetry without offering shared data ownership risk product rejection, regu...
René Bostic is the Technical VP of the IBM Cloud Unit in North America. Enjoying her career with IBM during the modern millennial technological era, she is an expert in cloud computing, DevOps and emerging cloud technologies such as Blockchain. Her strengths and core competencies include a proven record of accomplishments in consensus building at all levels to assess, plan, and implement enterprise and cloud computing solutions. René is a member of the Society of Women Engineers (SWE) and a m...
Poor data quality and analytics drive down business value. In fact, Gartner estimated that the average financial impact of poor data quality on organizations is $9.7 million per year. But bad data is much more than a cost center. By eroding trust in information, analytics and the business decisions based on these, it is a serious impediment to digital transformation.
Digital Transformation: Preparing Cloud & IoT Security for the Age of Artificial Intelligence. As automation and artificial intelligence (AI) power solution development and delivery, many businesses need to build backend cloud capabilities. Well-poised organizations, marketing smart devices with AI and BlockChain capabilities prepare to refine compliance and regulatory capabilities in 2018. Volumes of health, financial, technical and privacy data, along with tightening compliance requirements by...
Predicting the future has never been more challenging - not because of the lack of data but because of the flood of ungoverned and risk laden information. Microsoft states that 2.5 exabytes of data are created every day. Expectations and reliance on data are being pushed to the limits, as demands around hybrid options continue to grow.
Digital Transformation and Disruption, Amazon Style - What You Can Learn. Chris Kocher is a co-founder of Grey Heron, a management and strategic marketing consulting firm. He has 25+ years in both strategic and hands-on operating experience helping executives and investors build revenues and shareholder value. He has consulted with over 130 companies on innovating with new business models, product strategies and monetization. Chris has held management positions at HP and Symantec in addition to ...
Enterprises have taken advantage of IoT to achieve important revenue and cost advantages. What is less apparent is how incumbent enterprises operating at scale have, following success with IoT, built analytic, operations management and software development capabilities - ranging from autonomous vehicles to manageable robotics installations. They have embraced these capabilities as if they were Silicon Valley startups.
As IoT continues to increase momentum, so does the associated risk. Secure Device Lifecycle Management (DLM) is ranked as one of the most important technology areas of IoT. Driving this trend is the realization that secure support for IoT devices provides companies the ability to deliver high-quality, reliable, secure offerings faster, create new revenue streams, and reduce support costs, all while building a competitive advantage in their markets. In this session, we will use customer use cases...