diff --git a/the_book/http_server/src/bin/main.rs b/the_book/http_server/src/bin/main.rs new file mode 100644 index 0000000..5f33545 --- /dev/null +++ b/the_book/http_server/src/bin/main.rs @@ -0,0 +1,64 @@ +use http_server::ThreadPool; + +use std::fs; +use std::time::Duration; +use std::thread; +use std::io::prelude::*; +use std::net::TcpListener; +use std::net::TcpStream; + +enum ContinueServer { + Continue, + Stop, +} + +fn main() { + let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); + let pool = ThreadPool::new(4); + + for stream in listener.incoming() { + let stream = stream.unwrap(); + + //TODO: make the pool gracefully shutdown if handle_connection returns ContinueServer::Stop. + //to make the code generic, probably need to have pool.execute return a dynamic box? + pool.execute(|| { handle_connection(stream); }); + } +} + +fn handle_connection(mut stream: TcpStream) -> ContinueServer { + let mut buffer = [0; 1024]; + + stream.read(&mut buffer).unwrap(); + + //The different paths we want to watch for. + let get = b"GET / HTTP/1.1\r\n"; + let sleep = b"GET /sleep HTTP/1.1\r\n"; + let stop = b"GET /stop HTTP/1.1\r\n"; + + //return different paths, if nothing matches, returns 404. + let (status_line, filename, continue_after_request) = if buffer.starts_with(get) { + ("HTTP/1.1 200 OK", "hello.html", ContinueServer::Continue) + } else if buffer.starts_with(sleep) { + thread::sleep(Duration::from_secs(5)); + ("HTTP/1.1 200 OK", "hello.html", ContinueServer::Continue) + } else if buffer.starts_with(stop) { + ("HTTP/1.1 200 OK", "stopped.html", ContinueServer::Stop) + } else { + ("HTTP/1.1 404 NOT FOUND", "404.html", ContinueServer::Continue) + }; + + let contents = fs::read_to_string(filename).unwrap(); + let response = format!( + "{}\r\nContent-Length: {}\r\n\r\n{}", + status_line, + contents.len(), + contents, + ); + + stream.write_all(response.as_bytes()).unwrap(); + stream.flush().unwrap(); + + //returns whether the server should gracefully shutdown after this request, or keep going. + //TODO: not implemented. + continue_after_request +} diff --git a/the_book/http_server/src/lib.rs b/the_book/http_server/src/lib.rs new file mode 100644 index 0000000..956ccd2 --- /dev/null +++ b/the_book/http_server/src/lib.rs @@ -0,0 +1,117 @@ +use std::sync::mpsc; +use std::sync::Arc; +use std::sync::Mutex; +use std::thread; + +type Job = Box; + +//In order to be able to tell the workers to terminate, need to be able to send a terminate message. +enum Message { + NewJob(Job), + Terminate, +} + +pub struct ThreadPool { + workers: Vec, + sender: mpsc::Sender, +} + +impl ThreadPool { + /// create a new threadpool + /// + /// the size is the number of threads in the pool + /// + /// # Panics + /// + /// the 'new' function will panic if size is zero. + pub fn new(size: usize) -> ThreadPool { + assert!(size > 0); + + let (sender, receiver) = mpsc::channel(); + + //reciever is passed out to a bunch of threads, so need to wrap it in an Arc> to + //share it. + let receiver = Arc::new(Mutex::new(receiver)); + + let mut workers = Vec::with_capacity(size); + + for id in 0..size { + workers.push(Worker::new(id, Arc::clone(&receiver))); + } + + ThreadPool { workers, sender } + } + + ///passes the given closure to an arbitrary thread to execute. + pub fn execute(&self, f: F) + where + F: FnOnce() + Send + 'static, + { + let job = Box::new(f); + + //we send the closure as a message, and whichever thread happens to check its messages + //next starts executing it. + self.sender.send(Message::NewJob(job)).unwrap(); + } +} + +//this allows a graceful shutdown of the program, serving all remaining requests before +//terminating. +impl Drop for ThreadPool { + fn drop(&mut self) { + println!("Sending terminate message to all workers."); + + //we have to have two seperate loops because we dont send a message to a specific thread, + //we send it to whoever happens to be the first to grab it. Therefore, we could send our + //first message, thread 2 and then be stuck in a deadlock waiting for thread 3 to stop, + //which it never will as only thread 2 has been told to stop. + // + //So, we have to send all the messages and *then* join all the threads. + for _ in &self.workers { + self.sender.send(Message::Terminate).unwrap(); + } + + println!("Shutting down all workers."); + + for worker in &mut self.workers { + println!("Shutting down worker {}.", worker.id); + + if let Some(thread) = worker.thread.take() { + thread.join().unwrap() + } + } + } +} + +struct Worker { + id: usize, + thread: Option>, +} + +impl Worker { + fn new(id: usize, reciever: Arc>>) -> Worker { + //the call to recv() blocks untill a new message is avalible, so this thing isnt just + //spinning its wheels in an infinite loop. + let thread = thread::spawn(move || loop { + let message = reciever.lock().unwrap().recv().unwrap(); + + match message { + Message::NewJob(job) => { + job(); + + println!("Worker {} got job, executing.", id); + } + Message::Terminate => { + println!("Worker {} was told to terminate.", id); + + break; + } + } + }); + + Worker { + id, + thread: Some(thread), + } + } +} diff --git a/the_book/http_server/src/main.rs b/the_book/http_server/src/main.rs deleted file mode 100644 index 0419151..0000000 --- a/the_book/http_server/src/main.rs +++ /dev/null @@ -1,46 +0,0 @@ -use std::fs; -use std::time::Duration; -use std::thread; -use std::io::prelude::*; -use std::net::TcpListener; -use std::net::TcpStream; - -fn main() { - let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); - - for stream in listener.incoming() { - let stream = stream.unwrap(); - println!("Connection Established!"); - handle_connection(stream); - } -} - -fn handle_connection(mut stream: TcpStream) { - let mut buffer = [0; 1024]; - - stream.read(&mut buffer).unwrap(); - println!("Request: {}", String::from_utf8_lossy(&buffer[..])); - - let get = b"GET / HTTP/1.1\r\n"; - let sleep = b"GET /sleep HTTP/1.1\r\n"; - - let (status_line, filename) = if buffer.starts_with(get) { - ("HTTP/1.1 200 OK", "hello.html") - } else if buffer.starts_with(sleep) { - thread::sleep(Duration::from_secs(5)); - ("HTTP/1.1 200 OK", "hello.html") - } else { - ("HTTP/1.1 404 NOT FOUND", "404.html") - }; - - let contents = fs::read_to_string(filename).unwrap(); - let response = format!( - "{}\r\nContent-Length: {}\r\n\r\n{}", - status_line, - contents.len(), - contents, - ); - - stream.write_all(response.as_bytes()).unwrap(); - stream.flush().unwrap(); -} diff --git a/the_book/http_server/stopped.html b/the_book/http_server/stopped.html new file mode 100644 index 0000000..291ce2f --- /dev/null +++ b/the_book/http_server/stopped.html @@ -0,0 +1,11 @@ + + + + + Bye! + + +

Bye!

+

Bye from Rust!

+ +