'''Annotation support
.. _fiekas.eco: https://github.com/niklasf/eco
'''
from typing import Callable, List, Union, Optional, Tuple
import sys
import os.path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import MzChess
if MzChess.useQt5():
from PyQt5 import QtCore
else:
from PyQt6 import QtCore
import chess, chess.pgn
from chessengine import ChessEngine, PGNEval_REGEX
[docs]class Annotator():
'''A Annotator class applying
:param name: name of the annotation engine
:param notifyFunction: print-like function used for notification
'''
def __init__(self, name : str, notifyFunction : Optional[Callable[[str], None]] = None) -> None:
self.name = name
self.notifyFunction = notifyFunction
self.posScNagAV : List[Tuple[float, int, bool]] = list()
self.negScNagAV : List[Tuple[float, int, bool]] = list()
def _setClass(self, nagCode : int, score : float, addVariant : bool):
if score > 0:
newList = [(score, nagCode, addVariant)]
for scNagAv in self.posScNagAV:
if scNagAv[1] != nagCode:
newList.append(scNagAv)
self.posScNagAV = sorted(newList, reverse = True, key = lambda el : el[1])
else:
newList = [(score, nagCode, addVariant)]
for scNagAv in self.negScNagAV:
if scNagAv[1] != nagCode:
newList.append(scNagAv)
self.negScNagAV = sorted(newList, reverse = False, key = lambda el : el[1])
# ---------------------------------------------------------------
[docs] def setBlunder(self, score : float, addVariant : bool = True) -> None:
'''Sets the condition for a blunder move (NAG: $4)
:param score: score limit (score < 0 expected)
:param addVariant: add a variant in below score
'''
if score > 0:
score = -score
self._setClass(chess.pgn.NAG_BLUNDER, score, addVariant)
[docs] def setDubiousMove(self, score : float, addVariant : bool = False) -> None:
'''Sets the condition for a dubious move (NAG: $6)
:param score: score limit (score < 0 expected)
:param addVariant: add a variant in below score
'''
if score > 0:
score = -score
self._setClass(chess.pgn.NAG_DUBIOUS_MOVE, score, addVariant)
[docs] def setPoorMove(self, score : float, addVariant : bool = False) -> None:
'''Sets the condition for a poor move (NAG: $2)
:param score: score limit (score < 0 expected)
:param addVariant: add a variant in below score
'''
if score > 0:
score = -score
self._setClass(chess.pgn.NAG_MISTAKE, score, addVariant)
[docs] def setBrillantMove(self, score : float, addVariant : bool = False) -> None:
'''Sets the condition for a brilliant move (NAG: $3)
:param score: score limit (score > 0 expected)
:param addVariant: add a variant in below score
'''
if score < 0:
score = -score
self._setClass(chess.pgn.NAG_BRILLIANT_MOVE, score, addVariant)
[docs] def setSpeculativeMove(self, score : float, addVariant : bool = False) -> None:
'''Sets the condition for a speculative move (NAG: $5)
:param score: score limit (score > 0 expected)
:param addVariant: add a variant in below score
'''
if score < 0:
score = -score
self._setClass(chess.pgn.NAG_SPECULATIVE_MOVE, score, addVariant)
[docs] def setGoodMove(self, score : float, addVariant : bool = False) -> None:
'''Sets the condition for a good move (NAG: $1)
:param score: score limit (score > 0 expected)
:param addVariant: add a variant in below score
'''
if score < 0:
score = -score
self._setClass(chess.pgn.NAG_GOOD_MOVE, score, addVariant)
# ---------------------------------------------------------------
def _nagAv(self, whiteScore : str, lastWhiteScore : str, turn):
if whiteScore is not None and len(whiteScore) > 0:
try:
score = float(whiteScore)
except:
score = 1001 - float(whiteScore[1:])
try:
lastScore = float(lastWhiteScore)
except:
lastScore = 1001 - float(lastWhiteScore[1:])
if not turn:
score = -score
lastScore = -lastScore
if score > 0 and len(self.posScNagAV) > 0:
for sc, nag, av in self.posScNagAV:
if lastScore - score > sc:
return ([nag], av)
elif score < 0 and len(self.negScNagAV) > 0:
for sc, nag, av in self.negScNagAV:
if lastScore - score < sc:
return ([nag], av)
return (list(), False)
def _replaceScore(self, comment : str, newWhiteScore : float):
while True:
match = PGNEval_REGEX.search(comment)
if match is None:
break
comment = comment.replace(match.group(0),'')
comment = '[%eval {}] {}'.format(newWhiteScore, comment)
return comment
[docs] @staticmethod
def remove(game : chess.pgn.Game, comments : bool = False, variants : bool = False) -> chess.pgn.Game:
'''Remove certain items from a game
:param game: game to process
:param comments: If True, remove all comments
:param variants: If True, remove all variants
'''
gameNode = game
while gameNode is not None:
if comments:
gameNode.comment = ''
if variants and len(gameNode.variations) > 0:
gameNode.variations = [gameNode.variations[0]]
gameNode = gameNode.next()
return game
[docs] def apply(self,
game : Union[chess.pgn.Game, chess.pgn.GameNode] = chess.pgn.Game(),
scoreListList : List[List[float]] = list(),
pvListList : Optional[List[List[List[chess.Move]]]] = None,
forceHints : bool = False) -> bool:
'''Apply the results of AnnotateEngine.run to a game
:param game: game or gameNode where annotation starts (required)
:param scoreListList: for each move a list of scores for each variant, see AnnotateEngine.scoreListList
:param pvListList: for each move a list of lists of moves for each variant, see AnnotateEngine.pvListList
:param forceHints: force the creation of variants independent of the setXX definitions
:return: boolean indicating whether any hints are added
'''
assert len(scoreListList) > 0, 'Empty scoreListList detected'
if pvListList is not None:
if len(pvListList) == 0:
pvListList = None
else:
assert len(scoreListList) == len(pvListList), 'len(scoreListList) == len(pvListList) required'
if isinstance(game, chess.pgn.Game):
gameNode = game.next()
if len(self.name) > 0:
game.headers['Annotator'] = self.name
else:
gameNode = game
lastWsc = 0
pvList = None
anyHintsAdded = False
for plyID, scoreList in enumerate(scoreListList):
if gameNode is None:
break
if pvListList is not None and plyID > 0:
pvList = pvListList[plyID - 1]
else:
pvList = None
wsc = scoreList[0]
nag, av = self._nagAv(wsc, lastWsc, gameNode.turn())
lastWsc = wsc
gameNode.nags = nag
gameNode.comment = self._replaceScore(gameNode.comment, wsc)
addHints = (av or forceHints) and pvList is not None
if self.notifyFunction is not None:
self.notifyFunction('{}. {}: score = {}, nags = {}'.format(plyID, gameNode.move, wsc, nag))
if addHints:
anyHintsAdded = True
for n, score in enumerate(scoreList):
if self.notifyFunction is not None:
self.notifyFunction('--> {}'.format(pvList[n]))
gameNode.parent.add_line(pvList[n])
gameNode = gameNode.next()
return anyHintsAdded
[docs]class AnnotateEngine(QtCore.QObject):
'''A wrapper class collecting score and variant (pv) data from an engine
:param notifyFunction: print-like function used for notification
'''
def __init__(self,
notifyFunction : Optional[Callable[[str], None]] = None,
parent : Optional[QtCore.QObject] = None) -> None:
super(AnnotateEngine, self).__init__(parent)
self.notifyFunction = notifyFunction
[docs] def setup(self,
engine : ChessEngine,
hintPLYs : int = 0,
multiPV : int = 1) -> None:
'''Setup for operation
:param engine: engine used for annotation
:param hintPLYs: number of half moves (plys) in variants. Suppress hints by setting hintPLYs == 0
:param multiPV: number of variants
'''
assert hintPLYs >= 0
assert multiPV > 0
self.halfMoveID = 0
self.engine = engine
self.engine.bestMoveScoreSignal.connect(self._bestMoveScoreAvailable)
self.hintPLYs = hintPLYs
self.multiPV = multiPV
@QtCore.pyqtSlot(chess.Move, str)
def _bestMoveScoreAvailable(self, move : chess.Move, score : str):
if len(score) == 0:
score = None
if self.notifyFunction is not None:
if self.gameNode.move is not None:
san = self.gameNode.san()
else:
san = None
if not self.gameNode.turn():
moveText = '{}. {}'.format(self.halfMoveID//2 + 1, san)
else:
moveText = '... {}'.format(san)
self.notifyFunction('{}: score = {}'.format(moveText, score))
self.halfMoveID += 1
scoreList = list()
pvList = list()
if not isinstance(self.engine.playResult.info, list):
self.engine.playResult.info = [self.engine.playResult.info]
for hintID, info in enumerate(self.engine.playResult.info):
if self.hintPLYs > 0:
if 'pv' in info:
pvList.append(info['pv'][:self.hintPLYs])
else:
pvList.append([])
scoreList.append(self.engine.getScore(hintID = hintID))
self.scoreListList.append(scoreList)
if self.hintPLYs > 0:
self.pvListList.append(pvList)
if (self.numberOfPlys is not None and len(self.scoreListList) >= self.numberOfPlys) or not self._startNext():
return
def _startNext(self, isNew : bool = False) -> bool:
if not isNew:
self.gameNode = self.gameNode.next()
if self.gameNode is None:
return False
self.engine.uciNewGame(fen = self.gameNode.board().fen())
return self.engine.startAnalysis(multiPV = self.multiPV)
[docs] def run(self, game : Union[chess.pgn.Game, chess.pgn.GameNode], numberOfPlys : Optional[int] = None) -> bool:
'''Runs the engine for a whole game or a gameNode
:param game: game or gameNode
:param numberOfPlys: number of half moves to analyse, ``None`` means analysis of the rest of the game
:returns: True, if successful
'''
if isinstance(game, chess.pgn.Game):
self.gameNode = game.next()
else:
self.gameNode = game
self.numberOfPlys = numberOfPlys
self.scoreListList = list()
if self.hintPLYs > 0:
self.pvListList = list()
else:
self.pvListList = None
if not self._startNext(isNew = True):
return False
while (self.numberOfPlys is None or len(self.scoreListList) < self.numberOfPlys) and self.gameNode is not None:
QtCore.QCoreApplication.processEvents()
return True
if __name__ == "__main__":
import os, sys
import pickle
import argparse
import configparser
import chess.pgn
import configureEngine
from pgnParse import read_game
app = QtCore.QCoreApplication(sys.argv)
fileDirectory = os.path.dirname(os.path.abspath(__file__))
os.chdir(fileDirectory)
parser = argparse.ArgumentParser(description='PGN Parser: test')
parser.add_argument("pgnFile", help = "PGN-File")
parser.add_argument("--encoding", metavar = 'encoding', default = 'u',
choices=['u', 'utf-8-sig', 'i', 'iso-8859-1', 'a', 'ascii'],
help="target of parsing (g - games, h - headers, s - skip every second game, b - board, l - lexical analysis only)")
parser.add_argument("--gameID", metavar = 'gameID', type = int, default=0, help="ID of the game to be used")
parser.add_argument("--plyID", metavar = 'plyID', type = int, default=0, help="ID of the game to be used")
parser.add_argument("--multiPV", metavar = 'multiPV', type = int, default=1, help="ID of the game to be used")
parser.add_argument("--engine", metavar = 'engine', type = str, help="ID of the game to be used")
parser.add_argument("-debug", action = 'store_true', default = False, help = "Enable Debugging")
args = parser.parse_args()
assert args.pgnFile is not None, 'pgnFile is required'
encoding = args.encoding[0]
if encoding == 'u':
encoding = 'utf-8-sig'
elif encoding == 'i':
encoding = 'iso-8859-1'
elif encoding == 'a':
encoding = 'ascii'
settingsFile = os.path.join(fileDirectory, 'settings.ini')
settings = configparser.ConfigParser(delimiters=['='], allow_no_value=True)
settings.optionxform = str
settings.read(settingsFile, encoding = 'utf-8')
engineDict = configureEngine.loadEngineSettings(settings)
if args.engine is None:
selectedEngine = settings['Menu/Engine']['selectedEngine']
else:
selectedEngine = args.engine
assert selectedEngine in engineDict, 'Unexpected engine {} (must be out of {})'.format(selectedEngine, engineDict)
executable = engineDict[selectedEngine]
engine = ChessEngine(executable, limit = chess.engine.Limit(depth = 15), log = False)
_, ext = os.path.splitext(args.pgnFile)
assert ext in ['.ppgn', '.pgn'], 'Unexpected file type {} of {}'.format(ext, args.pgnFile)
if ext == '.ppgn':
with open(args.pgnFile, mode = 'rb') as f:
gameList = pickle.load(f)
game = gameList[args.gameID]
else:
pgn = open(args.pgnFile, mode = 'r', encoding = encoding)
for n in range(args.gameID + 1):
game = read_game(pgn)
annotateEngine = AnnotateEngine(notifyFunction = print)
annotateEngine.setup(engine, hintPLYs = 3, multiPV = args.multiPV)
if args.plyID <= 0:
rc = annotateEngine.run(game)
gameNode = game
forceHints = False
else:
gameNode = game
for n in range(args.plyID):
gameNode = gameNode.next()
rc = annotateEngine.run(gameNode.parent, numberOfPlys = 2)
forceHints = True
if rc:
annotator = Annotator(selectedEngine, notifyFunction = print)
annotator.setBlunder(1.0, addVariant = True)
annotator.apply(game = gameNode,
scoreListList = annotateEngine.scoreListList,
pvListList = annotateEngine.pvListList,
forceHints = forceHints)
exporter = chess.pgn.StringExporter(headers=True, variations=True, comments=True)
pgnString = game.accept(exporter)
print(pgnString)
base, _ = os.path.splitext(args.pgnFile)
with open('{}_new.pgn'.format(base), "w", encoding = 'iso-8859-1') as f:
f.write(pgnString)
app.exec()
print('completed')