#!/usr/bin/env/python # Tells Linux systems how to run this file. """ Morningside: Server by Kris Schnee. A tiny simulated world for AI testing and development. Acts as an offline server that reads/writes text files as its interface. Very simple physics/logic, turn- and tile-based. Features customizable levels, multiple players, and logging. This is the file you should run if you want to use the Morningside program. (morningside.py is the world itself, and this is the GUI/server interface.) """ ##___________________________________ ## ~Header~ ##___________________________________ __author__ = "Kris Schnee" __version__ = "2009.20" __license__ = "GPLv3 or MIT" ##___________________________________ ## ~Imported Modules~ ##___________________________________ ## Standard import os import time ## Third-party import pygame ## From pygame.org from pygame.locals import * ## Mine import ringtale ## Basic program structure / main loop import morningside ## The "worldsim" characters exist in import art ## Auto-loading and filing of graphics inc. fonts import driftwood ## Interface import toomi ## Socket networking ##___________________________________ ## ~Constants~ ##___________________________________ OPTIONS = {"random_terrain":None,"screenshots":True,"log_comic":False} LEVEL_SIZE = 12,8 ## In tiles TILE_SIZE = (50,50) ## In pixels WORLD_VIEW_SIZE = LEVEL_SIZE[0]*TILE_SIZE[0], LEVEL_SIZE[1]*TILE_SIZE[1] INTERFACE_WIDESCREEN = True ## Non-widescreen mode isn't really implemented. if INTERFACE_WIDESCREEN: CONSOLE_LEFT = WORLD_VIEW_SIZE[0] CONSOLE_WIDTH = 200 screen = pygame.display.set_mode((WORLD_VIEW_SIZE[0]+CONSOLE_WIDTH,WORLD_VIEW_SIZE[1])) else: CONSOLE_LEFT = 0 CONSOLE_WIDTH = 200 CONSOLE_HEIGHT = 200 screen = pygame.display.set_mode((WORLD_VIEW_SIZE[0],WORLD_VIEW_SIZE[1]+CONSOLE_HEIGHT)) DEFAULT_OBJECT_SPRITE_NAME = "box" DEFAULT_CHARACTER_SPRITE_NAME = "fox" FRAME_RATE = 1 ## A very slow rate is fine for this board-game-like program. INPUT_LISTEN_RATE = .5 ## How often to check for input? (Hz) PLAYER_IO_DIRECTORY = "player_io" ## Login requests and other I/O files go here SCREENSHOT_DIRECTORY = "screenshots" SOCKET_GREETING_MESSAGE = "Connection to Morningside server v"+__version__+" established. Hello.\nIf you haven't done it already:\nTo play, send a command like: 'login name=YourName [other_option=value]'." SOCKET_LOGIN_ERROR = "Error: You are not logged in. You need to send a command like: 'login name=YourName [other_option=value]'." SOCKET_LOGIN_ERROR2 = "All your base are belong to us." ##___________________________________ ## ~Classes~ ##___________________________________ class Server( ringtale.Framework, morningside.World, toomi.Server ): def __init__(self,**options): ringtale.Framework.__init__(self) morningside.World.__init__(self,**options) toomi.Server.__init__(self,options.get("internet_host",False),options.get("port",toomi.PORT)) """Graphics: Other graphics are auto-loaded by art module, but sprites aren't, because they're non-animated and lack the sprite data sheets that that module expects.""" art.sprites = {} filenames = os.listdir(os.path.join("graphics","sprites")) for filename in filenames: image = pygame.image.load(os.path.join("graphics","sprites",filename)).convert() image.set_colorkey((0,255,0)) art.sprites[filename.split(".")[0]] = image self.screenshots = options.get("screenshots",False) """Server features: This notion of time is real-world time, not the game round.""" self.wait_time = 1.0 / INPUT_LISTEN_RATE self.next_ready_time = time.time() + self.wait_time self.framerate = int(FRAME_RATE) ## Used by pygame.time.Clock within Ringtale self.SetUpMap(**options) self.PushState(options.get("starting_state","Main")) def SetUpMap(self,**options): """Load appropriate map given options. Used on start & reset.""" ## Load a layout from a file, or build randomly if requested. if options.get("random_terrain"): rt = options["random_terrain"] self.SetRandomTerrain(rt) self.level_name = "Random: "+str(rt) else: ## Try to load a level based on a file in the I/O directory. if os.path.exists(os.path.join(PLAYER_IO_DIRECTORY,"use_this_level.txt")): f = open(os.path.join(PLAYER_IO_DIRECTORY,"use_this_level.txt")) level_name = f.read() f.close() self.LoadLevel(level_name) self.Log("setup","Server starting at time: "+time.strftime(morningside.TIME_FORMAT,time.localtime())) def DeleteLeftoverOutputFiles(self): """On startup, clear any old outputs to avoid any confusion.""" output_files = [f for f in os.listdir(PLAYER_IO_DIRECTORY) if f.startswith("to_") and f.endswith(".txt")] for o in output_files: os.remove(os.path.join(PLAYER_IO_DIRECTORY,o)) def LoadCharacters(self): """Load all characters that have login files. Look for all files named "login_*.txt". For each one found, a character will be created, and the server will look for input files called "from_*.txt".""" login_requests = [f for f in os.listdir(PLAYER_IO_DIRECTORY) if f.startswith("login_") and f.endswith(".txt")] for filename in login_requests: character_name = filename[6:-4].title() ## login_miles.txt -> "Miles" f = open(os.path.join(PLAYER_IO_DIRECTORY,filename),"r") content = f.read() f.close() options = {"name":character_name} ## Read each line as a key:value pair. (Ignore lines starting w/"#") ## Note: options are read as text, so numeric values need int(). for line in [l for l in content.split("\n") if l and not l.startswith("#")]: if ":" in line: key, value = line.split(":") key = key.rstrip(" ") value = value.lstrip(" ") options[key] = value options.setdefault("image",DEFAULT_CHARACTER_SPRITE_NAME) ## Create a Character and add it to the world. self.CreateCharacter(**options) def WriteOutputFile(self,character): """Create a text file to send sensory input to a character.""" text = "" text += "# Morningside output file for character "+str(character.ID)+", "+character.name+"\n" for event in character.system_messages: text += "$ "+event+"\n" vision = self.SenseWorld(character) text += vision for event in character.sensed_events: text += "* "+event+"\n" ## A jury-rigged way of identifying this character as socket-based. if hasattr(character,"socket"): character.socket.send(text) else: output_file = open(os.path.join(PLAYER_IO_DIRECTORY,character.output_file_name),"w") output_file.write(text) output_file.close() character.ClearInput() def UpdateCommentBox(self): t = "" for c in self.characters: t += c.name if self.rules["energy"]: t += " ("+str(c.energy)+")" t += ": " if c.command: t += c.command.split(" ")[0] elif not c.alive: t += "[Dead!]" else: t += "[Waiting]" t += "\n" if not self.players: t += "No one's playing." self.interface_index["comments"].SetText(t) def UpdateDialogBox(self,who,what): self.interface_index["dialog"].AddText("\n"+who.name+": "+what) def TakeScreenshot(self,fullscreen=False): if fullscreen: pygame.image.save(screen,"time_"+str(self.time)+".bmp") else: screenshot = pygame.surface.Surface(WORLD_VIEW_SIZE) screenshot.blit(screen,(0,0),(0,0,WORLD_VIEW_SIZE[0],WORLD_VIEW_SIZE[1])) pygame.image.save(screenshot,os.path.join(SCREENSHOT_DIRECTORY,"time_"+str(self.time)+".bmp")) def MakeComic(self): textbox = driftwood.TextBox(coords=(0,0,WORLD_VIEW_SIZE[0],200),text="",lines_displayed=6) self.textbox = textbox ## For debugging screenshots = [f for f in os.listdir(SCREENSHOT_DIRECTORY) if f.startswith("time_") and f.endswith(".bmp")] n_panels = self.time + 1 for n in range(n_panels): i = pygame.image.load(os.path.join(SCREENSHOT_DIRECTORY,"time_"+str(n)+".bmp")) s = pygame.surface.Surface((WORLD_VIEW_SIZE[0],600)) s.fill((255,255,255)) s.blit(i,(0,0)) panel_script = self.comic_script[n] y = WORLD_VIEW_SIZE[1] text = "" for line in panel_script: text += line + "\n" print text textbox.SetText(text) textbox.Redraw() s.blit(textbox.surface,(0,WORLD_VIEW_SIZE[1])) pygame.image.save(s,os.path.join(SCREENSHOT_DIRECTORY,"time_"+str(n)+"_comic.bmp")) def HandleNewConnections(self,rejoiners=[],newcomers=[]): for socket, address in rejoiners: self.clients[address]["socket"].close() ## Close old socket self.clients[address]["socket"] = socket ## Replace socket self.clients[address]["last_input"] = int(time.time) for socket, address in newcomers: self.clients[address] = {"name":"[Not Logged In]", "socket":socket, "character":None, } socket.send(SOCKET_GREETING_MESSAGE) self.Log("socket","A client has joined from address "+str(address)+".") def Reset(self): for c in self.characters: c.SenseSystemMessage("RESET") self.world = [] self.CreateBlankGrid() self.entities = {} self.characters = [] self.highest_ID = 0 if self.level_name: self.LoadLevel(self.level_name) self.clients = {} ## Socket players need to log in again. ##### STATES ##### MAIN: Run the server, waiting for and reacting to input. def MainSetup(self): self.AddWidget(driftwood.Widget(name="level_name",coords=(CONSOLE_LEFT,0,CONSOLE_WIDTH,50),text=self.level_name)) self.AddWidget(driftwood.Widget(name="time",coords=(CONSOLE_LEFT,50,CONSOLE_WIDTH,50),text="Time: "+str(self.time))) self.AddWidget(driftwood.TextBox(name="comments",coords=(CONSOLE_LEFT,100,CONSOLE_WIDTH,100),font="EagerNaturalist14")) self.AddWidget(driftwood.Scroll(name="dialog",coords=(CONSOLE_LEFT,200,CONSOLE_WIDTH,WORLD_VIEW_SIZE[1]-200),font="EagerNaturalist14")) self.AddWidget(driftwood.Button(owner=self,name="bNext",coords=(600,350,100,50),text="Next")) self.DeleteLeftoverOutputFiles() self.LoadCharacters() ## Create a first set of output files. self.players = [c for c in self.characters if c.alive] for character in self.players: self.WriteOutputFile(character) self.UpdateCommentBox() if self.screenshots: self.MainDraw() self.TakeScreenshot() def MainLogic(self,unhandled_events=[]): for event in unhandled_events: if event.type == KEYDOWN: if event.key == K_ESCAPE: self.PopState() return elif event.key == K_r: ## Reset self.Log("system","*** RESET ***") morningside.World.__init__(self,**OPTIONS) self.SetUpMap(**OPTIONS) self.PopState() self.PushState("Main") return for message in self.messages: ## When "Next" is hit, skip any players who've not set a command. if message == "bNext": for p in self.players: if not p.command: p.command = "idle" ## Socket I/O rejoiners, newcomers = self.CheckForConnections() if newcomers: ## Mark them as present but don't give them a Character yet. self.HandleNewConnections(rejoiners,newcomers) inputs = self.CheckForInput() for address in inputs: client = self.clients.get(address) if not client: raise SystemError("Getting input from a client not connected? This shouldn't happen.") i = inputs[address] if not client.get("character"): ## Not logged in yet. Hopefully they sent a login string. words = i.split(" ") if not words or not (words[0] == "login"): client["socket"].send(SOCKET_LOGIN_ERROR) continue ## Done handling this input. try: options = {"image":DEFAULT_CHARACTER_SPRITE_NAME} for word in words[1:]: k,v = word.split("=") options[k] = v c = self.CreateCharacter(**options) client["character"] = c c.socket = client["socket"] ## Put a reference in. self.UpdateCommentBox() ## List them in the interface. self.players = [c for c in self.characters if c.alive] ## Send them an initial output. self.WriteOutputFile(c) except: client["socket"].send(SOCKET_LOGIN_ERROR2) else: ## Logged in and sending a command for their Character. client["character"].SetCommand(i) t = time.time() if t >= self.next_ready_time: self.next_ready_time = t + self.wait_time ## Ready to check for a command. for c in self.players: if not c.command: if os.path.exists(os.path.join(PLAYER_IO_DIRECTORY,c.input_file_name)): f = open(os.path.join(PLAYER_IO_DIRECTORY,c.input_file_name),"r") message = f.read() f.close() os.remove(os.path.join(PLAYER_IO_DIRECTORY,c.input_file_name)) ## self.Log("input",c.name+" sent: "+message) c.SetCommand(message) self.UpdateCommentBox() ## If someone's playing and everyone has set a command: if self.characters and (not [ch for ch in self.characters if not ch.command]): ## Carry out all commands. ## The order is determined by self.characters' order. for character in self.players: self.TakeCommand(character,character.command) character.command = "" ## Special display: Show things being said. if character.command.startswith("say"): said = " ".join(character.command.split(" ")[1:]) self.UpdateDialogBox(character,said) ## Report their energy level, if applicable. if self.rules["energy"]: for character in self.players: character.SenseSystemMessage("energy: "+str(character.energy)) ## Write sense output files, including the news. for character in self.players: self.WriteOutputFile(character) ## Update my list of living characters (!). self.players = [c for c in self.characters if c.alive] ## Server upkeep. self.time += 1 self.interface_index["time"].SetText("Time: "+str(self.time)) ## Refresh "waiting for commands" display. text = "" for c in self.players: text += c.name+": "+(c.command or "[Waiting]")+"\n" self.interface_index["comments"].SetText(text) ## Set up the "comic script". self.comic_script.append([]) self.UpdateCommentBox() if self.screenshots: self.MainDraw() self.TakeScreenshot() return def MainDraw(self): for x in range(self.size[0]): for y in range(self.size[1]): tile = self.world[x][y] image = art.tiles[tile.terrain] screen.blit(image,(x*50,y*50)) if tile.contents: image = art.sprites[tile.contents.image] screen.blit(image,(x*50,y*50)) for widget in self.interface: widget.Draw() ##___________________________________ ## ~Functions~ ##___________________________________ ##__________________________________ ## ~Autorun~ ##__________________________________ if __name__ == "__main__": s = Server(**OPTIONS) s.MainLoop() s.ShutDownServer(False) s.WriteLog()