That is the primary in a two-part sequence on distributed computing utilizing Ray. This half exhibits use Ray in your native PC, and half 2 exhibits scale Ray to multi-server clusters within the cloud.
gotten a brand new 16-core laptop computer or desktop, and also you’re keen to check its energy with some heavy computations.
You’re a Python programmer, although not an knowledgeable but, so that you open your favorite LLM and ask it one thing like this.
“I wish to rely the variety of prime numbers inside a given enter vary. Please give me some Python code for this.”
After a couple of seconds, the LLM offers you some code. You would possibly tweak it a bit by means of a brief back-and-forth, and finally you find yourself with one thing like this:
import math, time, os
def is_prime(n: int) -> bool:
if n < 2: return False
if n == 2: return True
if n % 2 == 0: return False
r = int(math.isqrt(n)) + 1
for i in vary(3, r, 2):
if n % i == 0:
return False
return True
def count_primes(a: int, b: int) -> int:
c = 0
for n in vary(a, b):
if is_prime(n):
c += 1
return c
if __name__ == "__main__":
A, B = 10_000_000, 20_000_000
total_cpus = os.cpu_count() or 1
# Begin "chunky"; we will sweep this later
chunks = max(4, total_cpus * 2)
step = (B - A) // chunks
print(f"CPUs~{total_cpus}, chunks={chunks}")
t0 = time.time()
outcomes = []
for i in vary(chunks):
s = A + i * step
e = s + step if i < chunks - 1 else B
outcomes.append(count_primes(s, e))
whole = sum(outcomes)
print(f"whole={whole}, time={time.time() - t0:.2f}s")
You run this system and it really works completely. The one drawback is that it takes fairly a little bit of time to run , possibly thirty to sixty seconds, relying on the dimensions of your enter vary. That’s most likely unacceptable.
What do you do now? You’ve a number of choices, with the three most typical most likely being:
– Parallelise the code utilizing threads or multi-processing
– Rewrite the code in a “quick” language like C or Rust
– Attempt a library like Cython, Numba or NumPy
These are all viable choices, however every has disadvantages. Choices 1 and three considerably improve your code complexity, and the center possibility might require you to study a brand new programming language.
What if I instructed you that there was one other approach? One the place the modifications required to your present code can be stored to an absolute minimal. One the place your runtime is mechanically unfold throughout all of your obtainable cores.
That’s exactly what the third-party Ray library guarantees to do.
What’s Ray?
The Ray Python library is an open-source distributed computing framework designed to make it simple to scale Python applications from a laptop computer to a cluster with minimal code modifications.
Ray makes it easy to scale and distribute compute-intensive software workloads — from deep studying to knowledge processing — throughout clusters of distant computer systems, whereas additionally delivering sensible software runtime enhancements in your laptop computer, desktop, or perhaps a distant cloud-based compute cluster.
Ray gives a wealthy set of libraries and integrations constructed on a versatile distributed execution framework, making distributed computing simple and accessible to all.
Briefly, Ray allows you to parallelise and distribute your Python code with minimal effort, whether or not it’s working regionally on a laptop computer or on an enormous cloud-based cluster.
Utilizing Ray
In the remainder of this text, I’ll take you thru the fundamentals of utilizing Ray to hurry up CPU-intensive Python code, and we’ll arrange some instance code snippets to point out you ways simple it’s to include the ability of Ray into your individual workloads.
To get essentially the most out of utilizing Ray, if you’re a knowledge scientist or machine studying engineer, there are a couple of key ideas it’s worthwhile to perceive first. Ray is made up of a number of elements.
Ray Information is a scalable library designed for knowledge processing in ML and AI duties. It presents versatile, high-performance APIs for AI duties, together with batch inference, knowledge preprocessing, and knowledge ingestion for ML coaching.
Ray Practice is a versatile, scalable library designed for distributed machine studying coaching and fine-tuning.
Ray Tune is used for Hyperparameter Tuning.
Ray Serve is a scalable library for deploying fashions to facilitate on-line inference APIs.
Ray RLlib is used for scalable reinforcement studying
As you possibly can see, Ray could be very centered on massive language fashions and AI purposes, however there’s one final necessary part I haven’t talked about but, and it’s the one I’ll be utilizing on this article.
Ray Core is designed for scaling CPU-intensive general-purpose Python purposes. It’s designed to unfold your Python workload over all obtainable cores on whichever system you’re working it on.
This text shall be speaking solely about Ray Core.
Two important ideas to understand inside Ray Core are duties and actors.
Duties are stateless employees or companies carried out utilizing Ray by adorning common Python features.
Actors (or stateful employees) are used, for instance, when it’s worthwhile to preserve observe of and preserve the state of dependent variables throughout your distributed cluster. Actors are carried out by adorning common Python courses.
Each actors and duties are outlined utilizing the identical @ray.distant decorator. As soon as outlined, these duties are executed with the particular .distant() methodology offered by Ray. We’ll have a look at an instance of this subsequent.
Organising a growth setting
Earlier than we begin coding, we must always arrange a growth setting to maintain our tasks siloed in order that they don’t intervene with one another. I’ll be utilizing conda for this, however be happy to make use of whichever instrument you like. I’ll be working my code utilizing a Jupyter pocket book in a WSL2 Ubuntu shell on Home windows.
$ conda create -n ray-test python=3.13 -y
$ conda activate ray-test
(ray-test) $ conda set up ray[default]
Code instance – counting prime numbers
Let’s revisit the instance I gave at the start: counting the variety of primes inside the interval 10,000,000 to twenty,000,000.
We’ll run our authentic Python code and time how lengthy it takes.
import math, time, os
def is_prime(n: int) -> bool:
if n < 2: return False
if n == 2: return True
if n % 2 == 0: return False
r = int(math.isqrt(n)) + 1
for i in vary(3, r, 2):
if n % i == 0:
return False
return True
def count_primes(a: int, b: int) -> int:
c = 0
for n in vary(a, b):
if is_prime(n):
c += 1
return c
if __name__ == "__main__":
A, B = 10_000_000, 20_000_000
total_cpus = os.cpu_count() or 1
# Begin "chunky"; we will sweep this later
chunks = max(4, total_cpus * 2)
step = (B - A) // chunks
print(f"CPUs~{total_cpus}, chunks={chunks}")
t0 = time.time()
outcomes = []
for i in vary(chunks):
s = A + i * step
e = s + step if i < chunks - 1 else B
outcomes.append(count_primes(s, e))
whole = sum(outcomes)
print(f"whole={whole}, time={time.time() - t0:.2f}s")
And the output?
CPUs~32, chunks=64
whole=606028, time=31.17s
Now, can we enhance that utilizing Ray? Sure, by following this easy 4-step course of.
Step 1 - Initialise Ray. Add these two traces firstly of your code.
import ray
ray.init()
Step 2 - Create our distant perform. That’s simple. Simply adorn the perform we wish to optimise with the @ray.distant decorator. The perform to be embellished is the one which’s performing essentially the most work. In our instance, that’s the count_primes perform.
@ray.distant(num_cpus=1)
def count_primes(begin: int, finish: int) -> int:
...
...
Step 3 - Launch the parallel duties. Name your distant perform utilizing the .distant Ray directive.
refs.append(count_primes.distant(s, e))
Step 4 - Look forward to all our duties to finish. Every activity in Ray returns an ObjectRef when it’s been referred to as. It is a promise from Ray. It means Ray has set the duty off working remotely, and Ray will return its worth in some unspecified time in the future sooner or later. We monitor all of the ObjectRefs returned by working duties utilizing the ray.get() perform. This blocks till all duties have accomplished.
outcomes = ray.get(duties)
Let’s put this all collectively. As you will notice, the modifications to our authentic code are minimal — simply 4 traces of code added and a print assertion to show the variety of nodes and cores we’re working on.
import math
import time
# -----------------------------------------
# Change No. 1
# -----------------------------------------
import ray
ray.init(auto)
def is_prime(n: int) -> bool:
if n < 2: return False
if n == 2: return True
if n % 2 == 0: return False
r = int(math.isqrt(n)) + 1
for i in vary(3, r, 2):
if n % i == 0:
return False
return True
# -----------------------------------------
# Change No. 2
# -----------------------------------------
@ray.distant(num_cpus=1) # pure-Python loop → 1 CPU per activity
def count_primes(a: int, b: int) -> int:
c = 0
for n in vary(a, b):
if is_prime(n):
c += 1
return c
if __name__ == "__main__":
A, B = 10_000_000, 60_000_000
total_cpus = int(ray.cluster_resources().get("CPU", 1))
# Begin "chunky"; we will sweep this later
chunks = max(4, total_cpus * 2)
step = (B - A) // chunks
print(f"nodes={len(ray.nodes())}, CPUs~{total_cpus}, chunks={chunks}")
t0 = time.time()
refs = []
for i in vary(chunks):
s = A + i * step
e = s + step if i < chunks - 1 else B
# -----------------------------------------
# Change No. 3
# -----------------------------------------
refs.append(count_primes.distant(s, e))
# -----------------------------------------
# Change No. 4
# -----------------------------------------
whole = sum(ray.get(refs))
print(f"whole={whole}, time={time.time() - t0:.2f}s")
Now, has all of it been worthwhile? Let’s run the brand new code and see what we get.
2025-11-01 13:36:30,650 INFO employee.py:2004 -- Began a neighborhood Ray occasion. View the dashboard at 127.0.0.1:8265
/dwelling/tom/.native/lib/python3.10/site-packages/ray/_private/employee.py:2052: FutureWarning: Tip: In future variations of Ray, Ray will now not override accelerator seen units env var if num_gpus=0 or num_gpus=None (default). To allow this conduct and switch off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
warnings.warn(
nodes=1, CPUs~32, chunks=64
whole=606028, time=3.04s
Nicely, the end result speaks for itself. The Ray Python code is 10x sooner than the common Python code. Not too shabby.
The place does this improve in velocity come from? Nicely, Ray can unfold your workload to all of the cores in your system. A core is sort of a mini-CPU. Once we ran our authentic Python code, it used just one core. That’s high-quality, but when your CPU has a couple of core, which most fashionable PCs do, then you definately’re leaving cash on the desk, so to talk.
In my case, the CPU has 24 cores, so it’s not stunning that my Ray code was approach sooner than the non-Ray code.
Monitoring Ray jobs
One other level value mentioning is that Ray makes it very simple to observe job executions through a dashboard. Discover within the output we acquired when working our Ray instance code, we noticed this,
... -- Began a neighborhood Ray occasion. View the dashboard at 127.0.0.1:8265
It’s exhibiting a neighborhood URL hyperlink as a result of I’m working this on my desktop. Should you had been working this on a cluster, the URL would level to a location on the cluster head node.
If you click on on the given URL hyperlink, it is best to see one thing just like this,
From this most important display, you possibly can drill down to observe many points of your Ray applications utilizing the menu hyperlinks on the prime of the web page.
Utilizing Ray actors
I beforehand talked about that actors had been an integral a part of the Ray core processing. Actors are used to coordinate and share knowledge between Ray duties. For instance, say you wish to set a worldwide restrict for ALL working duties that they need to adhere to. Let’s say you will have a pool of employee duties, however you wish to be sure that solely a most of 5 of these duties can run concurrently. Right here is a few code you would possibly assume would work.
import math, time, os
def is_prime(n: int) -> bool:
if n < 2: return False
if n == 2: return True
if n % 2 == 0: return False
r = int(math.isqrt(n)) + 1
for i in vary(3, r, 2):
if n % i == 0:
return False
return True
def count_primes(a: int, b: int) -> int:
c = 0
for n in vary(a, b):
if is_prime(n):
c += 1
return c
if __name__ == "__main__":
A, B = 10_000_000, 20_000_000
total_cpus = os.cpu_count() or 1
# Begin "chunky"; we will sweep this later
chunks = max(4, total_cpus * 2)
step = (B - A) // chunks
print(f"CPUs~{total_cpus}, chunks={chunks}")
t0 = time.time()
outcomes = []
for i in vary(chunks):
s = A + i * step
e = s + step if i < chunks - 1 else B
outcomes.append(count_primes(s, e))
whole = sum(outcomes)
print(f"whole={whole}, time={time.time() - t0:.2f}s")
We’ve got used a worldwide variable to restrict the variety of working duties, and the code is syntactically appropriate, working with out error. Sadly, you gained’t get the end result you anticipated. That’s as a result of every Ray activity runs in its personal course of house and has its personal copy of the worldwide variable. The worldwide variable is NOT shared between features. So after we run the above code, we’ll see output like this,
Complete calls: 200
Meant GLOBAL_QPS: 5.0
Anticipated time if really global-limited: ~40.00s
Precise time with 'international var' (damaged): 3.80s
Noticed cluster QPS: ~52.6 (ought to have been ~5.0)
To repair this, we use an actor. Recall that an actor is only a Ray-decorated Python class. Right here is the code with actors.
import time, ray
ray.init(ignore_reinit_error=True, log_to_driver=False)
# That is our actor
@ray.distant
class GlobalPacer:
"""Serialize calls so cluster-wide price <= qps."""
def __init__(self, qps: float):
self.interval = 1.0 / qps
self.next_time = time.time()
def purchase(self):
# Wait contained in the actor till we will proceed
now = time.time()
if now < self.next_time:
time.sleep(self.next_time - now)
# Reserve the subsequent slot; guard in opposition to drift
self.next_time = max(self.next_time + self.interval, time.time())
return True
@ray.distant
def call_api_with_limit(n_calls: int, pacer):
finished = 0
for _ in vary(n_calls):
# Look forward to international permission
ray.get(pacer.purchase.distant())
# fake API name (no additional sleep right here)
finished += 1
return finished
if __name__ == "__main__":
NUM_WORKERS = 10
CALLS_EACH = 20
GLOBAL_QPS = 5.0 # cluster-wide cap
total_calls = NUM_WORKERS * CALLS_EACH
expected_min_time = total_calls / GLOBAL_QPS
pacer = GlobalPacer.distant(GLOBAL_QPS)
t0 = time.time()
ray.get([call_api_with_limit.remote(CALLS_EACH, pacer) for _ in range(NUM_WORKERS)])
dt = time.time() - t0
print(f"Complete calls: {total_calls}")
print(f"World QPS cap: {GLOBAL_QPS}")
print(f"Anticipated time (if capped at {GLOBAL_QPS} QPS): ~{expected_min_time:.2f}s")
print(f"Precise time with actor: {dt:.2f}s")
print(f"Noticed cluster QPS: ~{total_calls/dt:.1f}")
Our limiter code is encapsulated in a category (GlobalPacer) and embellished with ray.distant, that means it applies to all working duties. We are able to see the distinction this makes to the output by working the up to date code.
Complete calls: 200
World QPS cap: 5.0
Anticipated time (if capped at 5.0 QPS): ~40.00s
Precise time with actor: 39.86s
Noticed cluster QPS: ~5.0
Abstract
This text launched Ray, an open-source Python framework that makes it simple to scale compute-intensive applications from a single core to a number of cores or perhaps a cluster with minimal code modifications.
I briefly talked about the important thing elements of Ray—Ray Information, Ray Practice, Ray Tune, Ray Serve, and Ray Core—emphasising that Ray Core is right for general-purpose CPU scaling.
I defined a few of the important ideas in Ray Core, equivalent to its introduction of duties (stateless parallel features), actors (stateful employees for shared state and coordination), and ObjectRefs (a future promise of a activity’s return worth)
To showcase some great benefits of utilizing Ray, I started with a easy CPU-intensive instance — counting prime numbers over a variety — and confirmed how working it on a single core could be gradual with a naive Python implementation.
As a substitute of rewriting the code in one other language or utilizing advanced multiprocessing libraries, Ray means that you can parallelise the workload in simply 4 easy steps and only a few additional traces of code:
- ray.init() to start out Ray
- Beautify your features with @ray.distant to show them into parallel duties
- .distant() to launch duties concurrently, and
- ray.get() to gather activity outcomes.
This method reduce the runtime of the prime-counting instance from ~30 seconds to ~3 seconds on a 24-core machine.
I additionally talked about how simple it’s to observe working jobs in Ray utilizing its built-in dashboard and confirmed entry it.
Lastly, I offered an instance of utilizing a Ray Actor by exhibiting why international variables aren’t appropriate for coordinating throughout duties, since every employee has its personal reminiscence house.
Within the second a part of this sequence, we’ll see take issues to a different degree by enabling Ray jobs to make use of much more CPU energy as we scale to massive multi-node servers within the cloud through Amazon Internet Companies.
