Multithreading with Springboot Framework, The clever way [Java]

Load sharing in application is one of the very common scenarios every developer come across. There are multiple ways we can do it, However I personally prefer using BlockingQueue way. Thanks to Spring dependency injection, it saves lots of calories.

There are basically two ways we can do this

  1. The crude way – Using locks, synchronized keyword, wait and notify and lot of nerd stuff. By using all of that we can achieve some control over threads behavior – i.e we can make a producer produce a message and consumer consumes it.
  2. The clever way – The crude way is boring and complex. We need another solution, Well thanks to Spring again it’ll offer just what we needed to find a good solution. Apart from just getting control over a producer and consumer threads we can trigger as many consumers as we need – absolutely with 0 lines of extra code.

Lets discuss the later with a scenario, But just before that I recommend you to read
multi-threading-with-springboot-framework to get an idea on how annotation based multi-threading can be done in springboot.

Sample Scenario – Web Crawler

Assume that you are developing a multi-threaded web scrapper, where it’ll launch 20 threads parallelly to scrap web pages faster. Each thread assigned a specific set of tasks like

  1. Validate the URL
  2. Download the page
  3. Extract all the information
  4. Separate hyperlinks
  5. Put all the links into queue.
  6. Stop

So every time we launch a new thread we need to supply a new URL to crawl. In order to achieve this we need couple of things like a

Queue – A queue to hold the URLs temporarily
Scheduler – Fetch URL from Queue and call Downloader
Downloader – Download the page and do the above 6 steps

A typical WebCrawler flow design looks like below (from wikipedia)

Source Wikipedia

We will design the Queue, Scheduler and Multi-threaded downloader parts alone from the above picture and ignore rest of the modules as they are out of context.

But before that lets us know more about Blocking Queue (BQ). A BQ is like any other queue data structure i.e. the first element that is added to the queue is the first one to be removed . One of the main advantage of BQ is that it’s best suited for multi-threading.

Image source: internet

It works efficiently in situations like above, the producer thread generates data and inserts into BQ and the consumer thread takes from the other end.

Now lets write some code to implement the components we were talking about.

Note: I’m going to use springboot and I recommend you to use it – You know why 😉

Step 1 – Create Springboot project

I also recommend using springboot initializer to generate project skeleton as it’ll reduce project setup time by a lot.

Once everything is put together the project structure looks like above.

Step 2 – Create below classes.

1). DemoProjectApplication.java

This class is auto created by start.spring.io when we create the springboot project. We don’t have to write much code in this class but we need to enable the Mvc and Async features by adding @EnableAsync and @EnableWebMvc annotations.

package com.demos.demoProject;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.web.servlet.config.annotation.EnableWebMvc;

@SpringBootApplication
@EnableAsync
@EnableWebMvc
public class DemoProjectApplication {

	public static void main(String[] args) {
		SpringApplication.run(DemoProjectApplication.class, args);
	}

}

2). QueueConfig.java

Lets create BQ instance as SpringBean so that it’ll be available across the application. Later Spring allows us to Inject it wherever required.

And as I mentioned earlier that we are going to limit the size of our @Async threadpool size to 20.

package com.demos.demoProject.crawler;

import java.net.URL;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
public class QueueConfig {

	// Create Queue as spring bean
	@Bean
	public BlockingQueue<URL> urlQueue(){
		return new LinkedBlockingQueue<URL>();
	}
	
	@Bean
	public Executor getAsyncExecutor() {
	        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
	        executor.setCorePoolSize(20);  // threadPool size 20
	        executor.setMaxPoolSize(20);
	        executor.setQueueCapacity(1000);
	        executor.setThreadNamePrefix("MyExecutor-");
	        executor.initialize();
	        return executor;
	    }
}

3). ScrapperController.java

Create below controller component which helps us to trigger the whole crawling process. However we don’t need a controller to trigger the threads but I feel its easy for a demo.

package com.demos.demoProject;

import java.net.MalformedURLException;
import java.net.URL;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.demos.demoProject.crawler.Scheduler;

@RestController
public class ScrapperController {

	private static final Logger LOGGER = LoggerFactory.getLogger(ScrapperController.class);

	@Autowired
	private BlockingQueue<URL> urlQueue;

	@Autowired
	private Scheduler multiThreadDownloader;

	public void startScrapping(List<URL> seedURLs) {
		LOGGER.info("Starting crawler....");

		// Add seed URLs to blocking queue
		urlQueue.addAll(seedURLs);

		// start crawling process by calling Scheduler
		multiThreadDownloader.scheduleEachUrlForDownload();
	}

	
	@RequestMapping("/trigger")
	public Optional<String> trigger() {

		List<URL> seedUrls = null;
		try {
			// create seed URL list
			seedUrls = List.of(
					new URL("https://google.com"), 
					new URL("https://yahoo.com"),
					new URL("https://stackoverflow.com")
					);
			
		} catch (MalformedURLException e) {
			e.printStackTrace();
		}

		this.startScrapping(seedUrls);

		return Optional.of("Success");
	}
}

trigger() method creates a seed URL list and then calls startScrapping() method to start the processing.

startScrapping() method inserts all the URLs from the seed list into the blocking queue and immediately calls multi-threading downloader to start the scheduler in separate thread (because of @Async )

4). Scheduler.java

Here whenever the scheduleEachUrlForDownload() (@Async) method is called it’ll create a new thread and triggers the infinite while loop.

package com.demos.demoProject;

import java.net.URL;
import java.util.concurrent.BlockingQueue;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

@Component
public class Scheduler {

	private static final Logger LOGGER = LoggerFactory.getLogger(Scheduler.class);

	@Autowired
	private MultiThreadedDownloader asyncDemo;

	@Autowired
	private BlockingQueue<URL> urlQueue;

	@Async
	public void scheduleEachUrlForDownload() {

		while (true) {
			try {
				// As the URLs being added by AsyncPageScrapper, this process goes on..
				URL urlTobeCrawled = urlQueue.take();

				LOGGER.info("Scheduling URL : {}", urlTobeCrawled);

				asyncDemo.asyncDownloader(urlTobeCrawled);

			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

}

URL urlTobeCrawled = urlQueue.take();

The take() method in above line awaits for URLs to be available in the queue, and once a URL is inserted by any other thread (we don’t have any other threads in our demo example except the first 3 seed URLs) then it will immediately picks it and calls asyncDownloader() which in turn creates new thread every-time it is called.

5). MultiThreadedDownloader.java

This is where our actual download code resides, So whenever Scheduler calls it for each URL a new thread will be created and it does all the 6 steps mentioned at the start of the article.

package com.demos.demoProject.crawler;

import java.net.URL;
import java.util.concurrent.BlockingQueue;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

@Component
public class MultiThreadedDownloader {

	private static final Logger LOGGER = LoggerFactory.getLogger(MultiThreadedDownloader.class);

	@Autowired
	private BlockingQueue<URL> urlQueue;

	@Async
	public void asyncDownloader(URL urlTobeCrawled) {

		// 1. Validate the URL

		// 2. Download the page

		// 3. Extract all the information

		// 4. Separate hyper-links

		// 5. Put all the links into queue.
		
		/*
		 * for (url from htmlPage) { urlQueue.put(new URL(url)) }
		 */

		// 6. Stop

		LOGGER.info("URL crawled :: {} ", urlTobeCrawled.toString());

		try {
			Thread.sleep(3000); // pretend like crawling took 3sec
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

	}
}

Note: Here I didn’t write code to download HTML page and parse it – as my intention was to convey the solution.

Lets assume that HTML page is downloaded successfully and all the html links are parsed. Now we can add those URLs into BQ (step 5 in the above class) so that this process becomes continuous until all the pages in the seed websites were completed (Assuming there are no outgoing links in those websites).

Step 3 – Run the application

Once application is up and running successfully hit the http://localhost:8080/trigger URL to trigger the crawling process.

Observation – From the above output we can see that the Scheduler is running with the thread name because it runs in infinite loop, and it’ll take a URL and call MultiThreadedDownloader every-time a URL object is available in the queue.

Whereas the MultiThreadedDownloader invokes a new thread for each URL. From the above picture MyExecutor-2, MyExecutor-3, and MyExecutor-4 are invoked by MultiThreadedDownloader.

Here MyExecutor-1 is producer (since it supplies URL’s) and MyExecutor-2 to MyExecutor-20 (our threadpool size is 20) are consumer threads (since they consume URLs for downloading and parsing HTML).

I think that concludes the article. Thanks for reading, Hope you enjoyed the article, if you would like to contribute or improve the content please let me know in the comments.