Source code for chessengine

'''A Qt-compatible interface similar to `chess.engine`_ 
 
.. _chess.engine: https://pypi.org/project/chess
'''

import os,  os.path
import time
import re
from typing import Any, Dict, Callable, Iterable, List, Optional, Tuple, Type, Union
import sys

sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import MzChess

if MzChess.useQt5():
 from PyQt5 import QtCore
else:
 from PyQt6 import QtCore

import chess
import chess.engine

PGNCmd_REGEX = re.compile(r'\[(%[a-z]*)?[ ]+([^\n\t \]]+)\]')
PGNEval_REGEX = re.compile(r'\[(%|%eval)[ ]+([^\n\t \]]+)\]')

[docs]class ChessEngine(QtCore.QObject): '''A Universal Chess Interface (`UCI`_) engine using (`QProcess`_) :param engine2Option: a pair of a path to the executable and *dict* of the changed options :param limit: a limit definition for the engine (see `chess.engine.Limit`_) :param timeout_msec: timeout for the engine to respond (should be at least 1000) :param log: log for the engine's commands :param ChessEngine.bestMoveScoreSignal: ``pyqtSignal`` emitted when a best move is available .. _UCI: http://wbec-ridderkerk.nl/html/UCIProtocol.html .. _QProcess: https://doc.qt.io/qt-5/qprocess.html .. _chess.engine.Limit: https://python-chess.readthedocs.io/en/latest/engine.html ''' bestMoveScoreSignal : QtCore.pyqtSignal = QtCore.pyqtSignal(chess.Move, str) def __init__(self, engine2Option : Tuple[os.PathLike, Dict[str, Union[str, int, bool, None]]], limit : chess.engine.Limit = chess.engine.Limit(depth = 10), timeout_msec : int = 1000, log : Optional[Callable[[str], None]] = None, parent : Optional[QtCore.QObject] = None) -> None: super(ChessEngine, self).__init__(parent) executable, optionsDict = engine2Option if not (os.path.isfile(executable) and os.access(executable, os.X_OK )): raise IOError('ChessEngine: {} is not an executable'.format(executable)) self.limit = limit self.setLog(log) self.timeout_msec = max(int(timeout_msec), 10) self.stdoutLines = list() self.p = QtCore.QProcess() # Keep a reference to the QProcess (e.g. on self) while it's running. self.p.readyReadStandardOutput.connect(self._fromStdout) self.p.readyReadStandardError.connect(self._fromStdout) self.stdout = '' self.stdoutLines = list() self.p.start(executable, [], QtCore.QIODevice.OpenModeFlag.ReadWrite | QtCore.QIODevice.OpenModeFlag.Text) while not self.p.waitForStarted(msecs = self.timeout_msec): self._log('ChessEngine: slow write, {} msec elapsed'.format(self.timeout_msec)) self.readyok = True self._toStdin('uci') start = int(round(time.time() * 1000)) loop = 1 while 'idDict' not in vars(self) or 'optionsDict' not in vars(self): QtCore.QCoreApplication.processEvents() elapsed = int(round(time.time() * 1000)) - start if elapsed > 3*self.timeout_msec: self._log('ChessEngine: long wait for uci response, {}x looped, {} msec elapsed'.format(loop, elapsed)) break loop += 1 if self.isReady(): for name, value in optionsDict.items(): self.uciSetOption(name, value) self.isrunning = False self.playResult = None self.board = chess.Board()
[docs] def kill(self, beSilent : bool = False) -> None: '''Kills the process, engine cannot be used anymore ''' if 'p' in vars(self): self.p.kill() isFinished = self.p.waitForFinished(1000) if not (isFinished and beSilent): raise IOError('Cannot stop ChessEngine ...') self.p = None
[docs] def setLog(self, log : Optional[Callable[[str], None]] = None) -> None: '''Sets the log for engine's commands :param log: log for the engine's commands, default = None ''' if isinstance(log, bool): if log: self.notify = print else: self.notify = None else: self.notify = log
[docs] def getScore(self, hintID : int = 0) -> Optional[int]: '''Delivers the score for a hint, if available :param hintID: hint, i.e. alternative to be used (0 for best move) :returns: score in centipawns or ``None`` ''' if 'info' not in vars(self.playResult) \ or hintID >= len(self.playResult.info) \ or 'score' not in self.playResult.info[hintID]: return None score = self.playResult.info[hintID]['score'].white() if isinstance(score, chess.engine.Cp): score = '{:+.1f}'.format(0.01 * score.score()) elif score is not None: score = str(score) return score
def _log(self, txt : str) -> None: if self.notify is not None: self.notify(txt) def _fromStdout(self) -> None: if self.p is None: return if 'stdout' not in vars(self): self.stdout = '' try: self.stdout += bytes(self.p.readAllStandardOutput()).decode('utf-8') except: return if not self.stdout.endswith('\n'): return self.stdoutLines += self.stdout.strip("\n").split("\n") self._log('ChessEngine/_fromStdout: stdout< {}'.format(self.stdout)) self.stdout = '' if self.stdoutLines[-1] == 'uciok': self._parseHeader() self.readyok = True self.stdoutLines = list() elif self.stdoutLines[-1] == 'readyok': self.readyok = True self.stdoutLines = list() elif self.stdoutLines[-1].startswith('bestmove'): self._parseResult() score = self.getScore(hintID = 0) self.readyok = True self.stdoutLines = list() if len(self.playResult.info) == 1: self.playResult.info = self.playResult.info[0] if self.playResult.move is not None: self.bestMoveScoreSignal.emit(self.playResult.move, score) elif score is not None: self.bestMoveScoreSignal.emit(chess.Move.null(), score) def _fromStderr(self) -> None: stderr = bytes(self.p.readAllStandardError()).decode('utf-8').strip("\n") self._log('ChessEngine/_fromStderr:???????????????\n <stderr: {} \n???????????????'.format(stderr)) def _toStdin(self, txt : str) -> bool: if self.p is None: return False self._log('ChessEngine/_toStdin: stdin> {}, readyok = {}'.format(txt, self.readyok)) if not self.readyok: return False self.readyok = False self.stdoutLines = list() self.p.write("{}\n".format(txt).encode('utf-8')) while not self.p.waitForBytesWritten(msecs = self.timeout_msec): self._log('ChessEngine/_toStdin: slow write, {} msec elapsed'.format(self.timeout_msec)) return True
[docs] def isReady(self) -> bool: '''Checks whether the engine is able to receive commands :returns: boolean indicating the response ''' return self.p.state() == QtCore.QProcess.ProcessState.Running and self.readyok
# Parser -------------------------------------------------------------------------------------- def _parseHeader(self) -> None: self.idDict = dict() self.optionsDict = dict() for line in self.stdoutLines: tokens = line.split(" ") report = tokens.pop(0) if report == 'id': key = tokens.pop(0) if key != 'name' and key != 'author': raise ValueError("UIE: Unknown {} in header (expected name or author)".format(key)) self.idDict[key] = ' '.join(tokens) elif report == 'option': self._parseOption(tokens) elif len(report.strip(' ')) > 0 and report != 'uciok': self._log("Extra line '{}'".format(line)) def _parseOption(self, tokens) -> None: optDict = dict() minValue = None maxValue = None name = None vtype = None while tokens: key = tokens.pop(0) if key == 'name': nameList = list() while tokens: part = tokens[0] if part in ['type', 'default', 'min', 'max', 'var']: break tokens.pop(0) nameList.append(part) name = ' '.join(nameList) elif key == 'type': vtype = tokens.pop(0) if vtype not in ['check', 'spin', 'combo', 'button', 'string']: raise ValueError("UIE: Unexpected option type '{}'".format(vtype)) if vtype == 'combo': optDict['varList'] = list() optDict[key] = vtype elif key == 'default': value = tokens.pop(0) if vtype == 'spin': optDict[key] = int(value) elif value == 'check': optDict[key] = value == 'true' else: optDict[key] = value elif key == 'min': minValue = int(tokens.pop(0)) elif key == 'max': maxValue = int(tokens.pop(0)) elif key == 'var': if 'varList' not in optDict: raise ValueError("UIE: var found, not type 'combo' ?!?") optDict['varList'].append(tokens.pop(0)) elif key == 'string': if len(tokens) > 0: value = tokens.pop(0) else: value = '' if value == '<empty>': value = '' optDict[key] = value if name is None or vtype is None: raise ValueError("UIE: name and/or type record missing") if vtype == 'spin' and minValue is not None and maxValue is not None: optDict['range'] = range(minValue, maxValue+1) self.optionsDict[name] = optDict def _parseResult(self) -> None: depth = 0 if self.playResult is None or 'move' in vars(self.playResult): self.playResult = chess.engine.PlayResult(None, None, draw_offered = None, resigned = None) self.playResult.info = list() for line in reversed(self.stdoutLines): tokens = line.split(" ") report = tokens.pop(0) if report == 'info': actInfo, newDepth = self._parseInfoline(tokens, depth) if newDepth is None or len(actInfo) == 0 or 'score' not in actInfo: continue if newDepth < depth: self.playResult.info = list(reversed(self.playResult.info)) break depth = newDepth self.playResult.info.append(actInfo) elif report == 'bestmove': try: self.playResult.move = chess.Move.from_uci(tokens.pop(0)) except: self.playResult.move = None if len(tokens) > 0 and tokens.pop(0) == 'ponder': try: self.playResult.ponder = chess.Move.from_uci(tokens.pop(0)) except: self.playResult.ponder = None def _parseInfoline(self, tokens, depth) -> Tuple[Dict[str, Any], int]: info = dict() actDepth = None while tokens: parameter = tokens.pop(0) if parameter == "string": info["string"] = " ".join(tokens) return dict(), depth elif parameter == "depth": actDepth = int(tokens.pop(0)) info[parameter] = actDepth elif parameter in ["seldepth", "nodes", "multipv", "currmovenumber", "hashfull", "nps", "tbhits", "cpuload"]: info[parameter] = int(tokens.pop(0)) # type: ignore elif parameter == "time": info["time"] = int(tokens.pop(0)) // 1000.0 elif parameter == "ebf": info["ebf"] = float(tokens.pop(0)) elif parameter == "score": kind = tokens.pop(0) value = tokens.pop(0) if tokens and tokens[0] in ["lowerbound", "upperbound"]: info[tokens.pop(0)] = True # type: ignore if kind == "cp": info["score"] = chess.engine.PovScore(chess.engine.Cp(int(value)), self.board.turn) elif kind == "mate": info["score"] = chess.engine.PovScore(chess.engine.Mate(int(value)), self.board.turn) else: raise ValueError("UIE: Unknown score kind {} in info (expected cp or mate)".format(kind)) elif parameter == "currmove": info["currmove"] = chess.Move.from_uci(tokens.pop(0)) elif parameter == "currline": cpunr = int(tokens.pop(0)) currline = list() for n in range(cpunr): currline.append(tokens.pop(0)) info["currline"] = currline elif parameter == "refutation": info["refutation"] = [] while tokens: try: move = chess.Move.from_uci(tokens[0]) tokens.pop(0) info["refutation"].append(move) except: break elif parameter == "pv": info["pv"] = list() while tokens: try: move = chess.Move.from_uci(tokens[0]) tokens.pop(0) info["pv"].append(move) except: break elif parameter == "wdl": info["wdl"] = chess.engine.PovWdl(chess.engine.Wdl(int(tokens.pop(0)), int(tokens.pop(0)), int(tokens.pop(0))), self.board.turn) return info, actDepth # Methods for use --------------------------------------------------------------------------------------
[docs] def uciNewGame(self, fen : Optional[str] = None, moves : List[chess.Move] = []) -> bool: '''Wrapper for the UCI *ucinewgame* command :param fen: starting position in Forsyth-Edwards-Notation (FEN) :param moves: list of moves to be applied :returns: boolean indicating the success ''' self.playResult = None if not self._toStdin('ucinewgame'): return False builder = ["position"] if fen is not None: self.board.set_fen(fen) builder.append("fen") builder.append(fen) else: self.board.set_fen(chess.STARTING_FEN) builder.append("startpos") for move in moves: builder.append(move.uci()) self.readyok = True self._toStdin(' '.join(builder)) self.readyok = True return True
[docs] def uciSetOption(self, name : str, value : Union[bool, int, str, None]) -> bool: '''Wrapper for the UCI *setoption* command :param name: name of the option :param value: value, type depends on name :returns: boolean indicating the success ''' if name not in self.optionsDict: raise ValueError('ChessEngine/setOption: {} is not a valid option name'.format(name)) optDict = self.optionsDict[name] optType = optDict['type'] builder = ['setoption'] builder.append('name') builder.append(name) builder.append('value') if optType == 'button' and value is not None: raise ValueError('ChessEngine/setOption: option name {} expects no value'.format(name)) elif optType == 'check': if not isinstance(value, bool): raise ValueError('ChessEngine/setOption: option name {} expects an boolean value'.format(name)) builder.append(['false', 'true'][value]) elif optType == 'spin': if isinstance(value, str): value = int(value) if 'range' in optDict and value not in optDict['range']: raise ValueError('ChessEngine/setOption: option name {}: value {} not in range {}'.format(name, value, optDict['range'])) builder.append(str(value)) elif optType == 'combo': if not isinstance(value, str): raise ValueError('ChessEngine/setOption: option name {} expects an string value'.format(name)) if 'varList' in optDict and value not in optDict['varList']: raise ValueError('ChessEngine/setOption: option name {}: value {} not in list {}'.format(name, value, optDict['varList'])) builder.append(value) elif Type == 'string': if not isinstance(value, str): raise ValueError('ChessEngine/setOption: option name {} expects an string value'.format(name)) if len(value) == 0: value == '<empty>' builder.append(value) if self._toStdin(' '.join(builder)): self.readyok = True return self.readyok
[docs] def uciGo(self, search_moves : Optional[Iterable[chess.Move]] = None, ponder : bool = False, infinite : bool = False) -> bool: '''Wrapper for the UCI *go* command :param search_moves: list of moves to be searched :param ponder: suggest a response to the *best-move* :param infinite: improve the *best-move* continuously :returns: boolean indicating the success ''' builder = ["go"] if ponder: builder.append("ponder") if self.limit.white_clock is not None: builder.append("wtime") builder.append(str(max(1, int(self.limit.white_clock * 1000)))) if self.limit.black_clock is not None: builder.append("btime") builder.append(str(max(1, int(self.limit.black_clock * 1000)))) if self.limit.white_inc is not None: builder.append("winc") builder.append(str(int(self.limit.white_inc * 1000))) if self.limit.black_inc is not None: builder.append("binc") builder.append(str(int(self.limit.black_inc * 1000))) if self.limit.remaining_moves is not None and int(self.limit.remaining_moves) > 0: builder.append("movestogo") builder.append(str(int(self.limit.remaining_moves))) if self.limit.depth is not None: builder.append("depth") builder.append(str(max(1, int(self.limit.depth)))) if self.limit.nodes is not None: builder.append("nodes") builder.append(str(max(1, int(self.limit.nodes)))) if self.limit.mate is not None: builder.append("mate") builder.append(str(max(1, int(self.limit.mate)))) if self.limit.time is not None: builder.append("movetime") builder.append(str(max(1, int(self.limit.time * 1000)))) if infinite: builder.append("infinite") if search_moves is not None: builder.append("searchmoves") builder.extend(move.uci() for move in search_moves) if not self._toStdin(' '.join(builder)): return False if infinite: self.isrunning = True self.readyok = True return True
[docs] def uciStop(self) -> bool: '''Wrapper for the UCI *stop* command to stop an infinite *go* :returns: boolean indicating the success ''' if not self.isrunning: return False return self._toStdin('stop')
[docs] def uciQuit(self) -> None: 'Terminate the engine' self._toStdin('quit')
# chess.engine like calls --------------------------------------------------------------------------------------
[docs] def startPlay(self) -> bool: '''Emits *uciGO* in *play* mode, i.e.g * *UCI_AnalyseMode = off* * *MultiPV = 1*, i.e. no alternative move suggestions :returns: boolean indicating the success ''' if 'MultiPV' in self.optionsDict: if not self.uciSetOption('MultiPV', 1): return False if 'UCI_AnalyseMode' in self.optionsDict: if not self.uciSetOption('UCI_AnalyseMode', False): return False return self.uciGo()
[docs] def startAnalysis(self, multiPV : int = 1) -> bool: '''Emits *uciGO* in *analyse* mode, i.e. * *UCI_AnalyseMode = off* * *MultiPV =* ``multiPV`` :param multiPV: number of alternative move suggestions :returns: boolean indicating the success ''' if 'MultiPV' in self.optionsDict: if not self.uciSetOption('MultiPV', max(1, multiPV)): return False if 'UCI_AnalyseMode' in self.optionsDict: if not self.uciSetOption('UCI_AnalyseMode', True): return False return self.uciGo()
def setELO(self, elo : Union[int, str]) -> bool: if not ('UCI_Elo' in self.optionsDict and 'UCI_LimitStrength' in self.optionsDict): return False eloDict = self.optionsDict['UCI_Elo'] if elo == 'max': elo =eloDict['range'].stop - 1 elif elo == 'min': elo = eloDict['range'].start if elo not in eloDict['range']: raise ValueError('ChessEngine: elo {} not in {}'.format(elo, eloDict['range'])) if not self.uciSetOption('UCI_Elo', elo): return False if not self.uciSetOption('UCI_LimitStrength', True): return False self.readyok = True return True
# --------------------------------------------------------------------- # --------------------------------------------------------------------- if __name__ == "__main__": os.chdir('C:/Users/Reinh/chess_engines') stockfish12 = 'stockfish_12_win_x64_bmi2/stockfish_20090216_x64_bmi2.exe' togaII4 = 'TogaII40/Windows/TogaII_40_intelPGO_x64.exe' komodo12 = 'komodo-12.1.1_5a8fc2/Windows/komodo-12.1.1-64bit-bmi2.exe' fritz17 = 'Fritz17/Fritz17.exe' lcZero26 = 'lc0-v0.26.3-windows-gpu-opencl/lc0.exe' raubfischGTZ23 = 'RaubfischX44_and_GTZ23/GTZ23/RaubfischGTZ23_nn_sl-bmi2.exe' executable = stockfish12 def waitForData(engine): for i in range(100): QtCore.QThread.msleep(engine.timeout_msec) QtCore.QCoreApplication.processEvents() if engine.isReady(): return print('----> waiting {} * {} ms'.format(i, engine.timeout_msec)) def printPlayResult(engine): playResult = engine.playResult rootBoard = chess.Board(engine.board.fen()) print('{} : {}'.format('best move', playResult.move)) if playResult.ponder is not None: print('{} : {}'.format('expected response', playResult.ponder)) if isinstance(playResult.info, dict): infoList = [playResult.info] else: infoList = playResult.info for i, infoDict in enumerate(infoList): print('info #{}'.format(i+1)) for key, value in infoDict.items(): if key == 'pv': pvString = '' for n, moveList in enumerate(value): if not isinstance(moveList, list): moveList = [moveList] for move in moveList: rootBoard.push(move) uciMove = move.uci() if rootBoard.is_checkmate(): uciMove += '#' elif rootBoard.is_check(): uciMove += '+' if engine.board.turn: if n == 0: pvString += '1.{}'.format(uciMove) elif n % 2 == 0: pvString += ' {}.{}'.format(n//2+1, uciMove) else: pvString += ' {}'.format(uciMove) else: if n == 0: pvString += '1...{}'.format(uciMove) elif n % 2 == 1: pvString += ' {}.{}'.format((n+1)//2+1, uciMove) else: pvString += ' {}'.format(uciMove) print('{} : {}'.format(key, pvString)) elif value is not None: print('{} : {}'.format(key, value)) def runChessEngine(): global executable fenList = [ "3rr3/2p3p1/4N3/3b4/1p3P2/2PBB3/kPQ3PP/3R2K1 w - - 0 1", "rn3r1k/pp4pp/2p1Q2N/q2np3/4N2P/2PP4/PP3P2/R3K2R w KQ - 0 1", "r2rkn2/1R1N2p1/2p1B2p/4p1PP/p1P2b2/5P2/P3K3/1R6 w - - 0 1", "2r4k/p2R4/1p2P3/5p2/3PbbpP/BP6/P1r5/4Q1K1 b - - 0 1" ] engine = ChessEngine(executable, limit = chess.engine.Limit(depth = 15), log = True) waitForData(engine) for item in sorted(engine.idDict): print(' {} : {}'.format(item, engine.idDict[item])) print('Options:') for opt in sorted(engine.optionsDict): print(' {} : {}'.format(opt, engine.optionsDict[opt])) for fen in fenList: engine.uciNewGame(fen = fen) print('------------------------- startPlay -------------------------') engine.startPlay() waitForData(engine) printPlayResult(engine) print('----------------------- startAnalysis -----------------------') engine.startAnalysis(multiPV = 1) waitForData(engine) printPlayResult(engine) import sys from eco import ECODatabase def scoreECOTable(): global executable, basename, depth filename = '{}.tsv'.format(basename) print('Scoring {} -------------'.format(filename)) eco = ECODatabase(tsvPattern = filename) fenList = eco.fen2Id().keys() engine = ChessEngine(executable, limit = chess.engine.Limit(depth = depth), timeout_msec = 300, log = True) waitForData(engine) nTotal = len(fenList) fen2score = ['\t'.join(['fen',executable])] scmin, scmean, scmax = sys.maxsize//2, 0, -(sys.maxsize//2) for n, fen in enumerate(fenList): engine.startAnalysis(multiPV = 1) waitForData(engine) score = int(str(engine.playResult.info['score'].relative)) scmin = min(scmin, score) scmean += score/nTotal scmax = max(scmax, score) fen2score.append('\t'.join([fen, str(score)])) print(' {} of {} completed'.format(n+1, nTotal)) print('Scores {} <= {:.1f} <= {}'.format(scmin, scmean, scmax)) fscFile = os.path.join(eco.ecoDirectory, '{}.fsc'.format(basename)) with open(fscFile, mode='w') as f: f.write('\n'.join(fen2score)) print('Result filed @ {}'.format(fscFile)) app = QtCore.QCoreApplication(sys.argv) QtCore.QTimer.singleShot(10, runChessEngine) # QtCore.QTimer.singleShot(10, scoreECOTable) app.exec() print('completed')