from typing import Optional, Dict, Any, Union, Literal, Mapping, overload, Sequence, Iterator, AsyncIterator, TYPE_CHECKING, Type, TypeVar, cast
import inspect
import warnings
from typing_extensions import TypedDict # Use typing.TypedDict in Python >= 3.8
from ...streaming import Stream, AsyncStream
from ..._resource import APIResource, AsyncAPIResource
if TYPE_CHECKING:
from ..._client import VeniceClient
from ...exceptions import InvalidRequestError, MissingStreamClassError, APIResponseProcessingError
from ...types.chat import (
MessageParam, VeniceParameters, ResponseFormat, UsageData,
ChatCompletionChoice, ChatCompletion, ChatCompletionChunk,
Tool, ToolChoice, ToolChoiceObject, StreamOptions, ChunkModelFactory
)
# Re-export types for backwards compatibility
__all__ = [
"ChatCompletions",
"AsyncChatCompletions"
]
# --- Resource Class ---
[docs]
class ChatCompletions(APIResource):
"""
Provides access to chat completion operations.
This class manages synchronous chat completion operations with Venice AI models,
supporting both standard (non-streaming) and streaming response formats. It serves
as the primary interface for chat-based interactions with Venice AI language models.
The class handles parameter validation, request formation, and response parsing
for chat completion requests.
:param _client: The client instance used to make API requests.
:type _client: venice_ai._client.VeniceClient
Example:
.. code-block:: python
from venice_ai import VeniceClient
# Initialize the client
client = VeniceClient(api_key="your-api-key")
# Create a chat completion
response = client.chat.completions.create(
model="venice-1",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Tell me about Venice AI."}
]
)
# Access the response content
print(response["choices"][0]["message"]["content"])
"""
@overload
def create(
self,
*,
model: str,
messages: Sequence[MessageParam],
stream: Literal[False] = False, # Explicit non-streaming case
# --- Common Optional Parameters ---
frequency_penalty: Optional[float] = None,
max_tokens: Optional[int] = None, # Deprecated. Please use max_completion_tokens instead.
max_completion_tokens: Optional[int] = None,
n: Optional[int] = None,
presence_penalty: Optional[float] = None,
response_format: Optional[ResponseFormat] = None,
seed: Optional[int] = None,
stop: Optional[Union[str, Sequence[str]]] = None,
temperature: Optional[float] = None,
top_p: Optional[float] = None,
tools: Optional[Sequence[Tool]] = None,
tool_choice: Optional[Union[Literal["none", "auto"], ToolChoiceObject]] = None,
user: Optional[str] = None, # Discarded but supported for OpenAI compat
venice_parameters: Optional[VeniceParameters] = None,
# --- Less Common / Newer Params from Docs ---
logprobs: Optional[bool] = None, # If requesting logprobs (check API if bool or object)
top_logprobs: Optional[int] = None,
parallel_tool_calls: Optional[bool] = None,
repetition_penalty: Optional[float] = None,
stop_token_ids: Optional[Sequence[int]] = None,
top_k: Optional[int] = None,
stream_options: Optional[StreamOptions] = None,
stream_cls: Optional[Type[ChunkModelFactory[ChatCompletionChunk]]] = None,
**kwargs: Any
) -> ChatCompletion: # Return type for non-streaming
...
@overload
def create(
self,
*,
model: str,
messages: Sequence[MessageParam],
stream: Literal[True],
stream_cls: Optional[Type[ChunkModelFactory[ChatCompletionChunk]]] = None,
# --- Common Optional Parameters ---
frequency_penalty: Optional[float] = None,
max_tokens: Optional[int] = None, # Deprecated. Please use max_completion_tokens instead.
max_completion_tokens: Optional[int] = None,
n: Optional[int] = None,
presence_penalty: Optional[float] = None,
response_format: Optional[ResponseFormat] = None,
seed: Optional[int] = None,
stop: Optional[Union[str, Sequence[str]]] = None,
temperature: Optional[float] = None,
top_p: Optional[float] = None,
tools: Optional[Sequence[Tool]] = None,
tool_choice: Optional[Union[Literal["none", "auto"], ToolChoiceObject]] = None,
user: Optional[str] = None,
venice_parameters: Optional[VeniceParameters] = None,
# --- Less Common / Newer Params ---
logprobs: Optional[bool] = None,
top_logprobs: Optional[int] = None,
parallel_tool_calls: Optional[bool] = None,
repetition_penalty: Optional[float] = None,
stop_token_ids: Optional[Sequence[int]] = None,
top_k: Optional[int] = None,
stream_options: Optional[StreamOptions] = None,
**kwargs: Any
) -> Iterator[ChatCompletionChunk]: # Return type for streaming (iterator of dicts)
...
[docs]
def create(
self,
*,
model: str,
messages: Sequence[MessageParam],
stream: bool = False,
stream_cls: Optional[Type[ChunkModelFactory[ChatCompletionChunk]]] = None,
**kwargs: Any # Catch all other keyword args
) -> Union[ChatCompletion, Iterator[ChatCompletionChunk]]:
"""
Create a model response for the given chat conversation.
This method handles the core functionality of the chat completions API, allowing
for both synchronous and streaming responses. It sends the provided messages
and parameters to the Venice AI API and returns either a complete response or
a stream of partial responses.
The method automatically formats the request body, applies appropriate defaults,
and routes the request to either the standard or streaming endpoint based on
the ``stream`` parameter.
:param model: ID of the model to use (e.g., ``"venice-1"``, ``"llama-3.3-70b"``).
:type model: str
:param messages: Sequence of messages forming the conversation.
:type messages: Sequence[venice_ai.types.chat.MessageParam]
:param stream: If ``True``, stream back partial progress. Defaults to ``False``.
Returns an ``Iterator[ChatCompletionChunk]`` if ``True``, otherwise ``ChatCompletion``.
:type stream: bool
:param stream_cls: Optional stream wrapper class for streaming responses. Must conform to the ChunkModelFactory protocol.
:type stream_cls: Optional[Type[venice_ai.types.chat.ChunkModelFactory[venice_ai.types.chat.ChatCompletionChunk]]]
:param frequency_penalty: Number between -2.0 and 2.0. Positive values penalize new tokens based on their existing frequency in the text so far.
:type frequency_penalty: Optional[float]
:param max_tokens: Deprecated. Please use ``max_completion_tokens`` instead.
The maximum number of tokens that can be generated in the chat completion.
The total length of input tokens and generated tokens is limited by the model's context length.
:type max_tokens: Optional[int]
:param max_completion_tokens: Maximum number of tokens that can be generated in the chat completion.
:type max_completion_tokens: Optional[int]
:param n: Number of chat completion choices to generate for each input message.
:type n: Optional[int]
:param presence_penalty: Number between -2.0 and 2.0. Positive values penalize new tokens based on whether they appear in the text so far.
:type presence_penalty: Optional[float]
:param response_format: Specifies the format that the model must output (e.g., for JSON mode).
:type response_format: Optional[venice_ai.types.chat.ResponseFormat]
:param seed: Random seed for reproducible outputs.
:type seed: Optional[int]
:param stop: Up to 4 sequences where the API will stop generating further tokens.
:type stop: Optional[Union[str, Sequence[str]]]
:param temperature: Sampling temperature between 0.0 and 2.0. Higher values make output more random, lower values more focused and deterministic. Defaults to 0.7.
:type temperature: Optional[float]
:param top_p: Nucleus sampling parameter between 0.0 and 1.0. Defaults to 1.0.
:type top_p: Optional[float]
:param tools: List of tools the model may call.
:type tools: Optional[Sequence[venice_ai.types.chat.Tool]]
:param tool_choice: Controls which (if any) tool is called by the model. Can be ``"none"``, ``"auto"``, or a specific tool.
:type tool_choice: Optional[Union[Literal["none", "auto"], venice_ai.types.chat.ToolChoiceObject]]
:param user: Unique identifier representing your end-user (discarded by API but supported for OpenAI compatibility).
:type user: Optional[str]
:param venice_parameters: Venice-specific parameters for fine-tuning model behavior.
:type venice_parameters: Optional[venice_ai.types.chat.VeniceParameters]
:param logprobs: Whether to return log probabilities of the output tokens.
:type logprobs: Optional[bool]
:param top_logprobs: Number of most likely tokens to return at each token position if ``logprobs`` is ``True``.
:type top_logprobs: Optional[int]
:param parallel_tool_calls: Whether to enable parallel function calling during tool use.
:type parallel_tool_calls: Optional[bool]
:param repetition_penalty: Penalty for token repetition.
:type repetition_penalty: Optional[float]
:param stop_token_ids: List of token IDs at which to stop generation.
:type stop_token_ids: Optional[Sequence[int]]
:param top_k: Number of highest probability vocabulary tokens to keep for top-k-filtering.
:type top_k: Optional[int]
:param stream_options: Additional options for controlling streaming behavior.
:type stream_options: Optional[venice_ai.types.chat.StreamOptions]
:param logit_bias: Modify the likelihood of specified tokens appearing in the completion. Accepts a JSON object that maps tokens (specified by their token ID in the tokenizer) to an associated bias value from -100 to 100.
:type logit_bias: Optional[Dict[str, int]]
:param kwargs: Additional keyword arguments.
:return: A :class:`~venice_ai.types.chat.ChatCompletion` if ``stream`` is ``False``,
otherwise an ``Iterator`` of :class:`~venice_ai.types.chat.ChatCompletionChunk`.
:rtype: Union[venice_ai.types.chat.ChatCompletion, Iterator[venice_ai.types.chat.ChatCompletionChunk]]
:raises venice_ai.exceptions.InvalidRequestError: If parameters are invalid or malformed.
:raises venice_ai.exceptions.AuthenticationError: If the API key is invalid or missing.
:raises venice_ai.exceptions.PermissionDeniedError: If access is denied to the requested model or feature.
:raises venice_ai.exceptions.NotFoundError: If the model or resource is not found.
:raises venice_ai.exceptions.RateLimitError: If rate limits are exceeded for the account.
:raises venice_ai.exceptions.APIError: For other API-related errors not covered by specific exceptions.
Example:
.. code-block:: python
# Non-streaming usage with system and user messages
from venice_ai import VeniceClient
client = VeniceClient(api_key="your-api-key")
response = client.chat.completions.create(
model="llama-3.3-70b",
messages=[
{"role": "system", "content": "You are a helpful assistant specializing in Python."},
{"role": "user", "content": "Write a function to calculate the Fibonacci sequence."}
],
temperature=0.3 # More deterministic/focused response
)
print(response["choices"][0]["message"]["content"])
# Streaming usage with progress display
for chunk in client.chat.completions.create(
model="venice-1",
messages=[{"role": "user", "content": "Explain quantum computing briefly."}],
stream=True,
max_completion_tokens=250 # Limit response length
):
content = chunk["choices"][0]["delta"].get("content", "")
if content:
print(content, end="", flush=True)
# Using tools/function calling
response = client.chat.completions.create(
model="llama-3.3-70b",
messages=[{"role": "user", "content": "What's the weather in New York?"}],
tools=[{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get current weather for a location",
"parameters": {
"type": "object",
"properties": {
"location": {"type": "string", "description": "City name"},
"unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}
},
"required": ["location"]
}
}
}]
)
"""
# Logic to handle max_tokens deprecation and precedence
actual_max_completion_tokens: Optional[int] = kwargs.pop("max_completion_tokens", None)
deprecated_max_tokens: Optional[int] = kwargs.pop("max_tokens", None)
if actual_max_completion_tokens is not None:
kwargs["max_completion_tokens"] = actual_max_completion_tokens
if deprecated_max_tokens is not None:
warnings.warn(
"Both `max_tokens` and `max_completion_tokens` were provided. "
"`max_tokens` is deprecated and will be ignored in favor of `max_completion_tokens`.",
DeprecationWarning,
stacklevel=2,
)
elif deprecated_max_tokens is not None:
warnings.warn(
"The `max_tokens` parameter is deprecated. Please use `max_completion_tokens` instead.",
DeprecationWarning,
stacklevel=2,
)
kwargs["max_completion_tokens"] = deprecated_max_tokens
# Construct request body, filtering out None values from kwargs
body: Dict[str, Any] = {
"model": model,
"messages": messages,
"stream": stream, # Set based on the stream parameter
}
# Add optional parameters from kwargs if they are not None
for key, value in kwargs.items():
if value is not None:
body[key] = value
# Handle specific naming or structuring if needed
# e.g. if venice_parameters needs special handling
if stream:
user_provided_stream_cls = stream_cls
effective_stream_cls: Any = Stream # Default to our Stream wrapper
if user_provided_stream_cls is not None:
# Check if the user_provided_stream_cls is a class and callable (basic check)
if inspect.isclass(user_provided_stream_cls):
try:
# First check if it's a subclass of our known stream types
if issubclass(user_provided_stream_cls, (Stream, AsyncStream)):
effective_stream_cls = cast(Any, user_provided_stream_cls)
else:
# For custom classes, check if they have the proper interface
# They should have __init__ with iterator and client params, and __iter__ method
sig = inspect.signature(user_provided_stream_cls.__init__)
params = list(sig.parameters.keys())
has_proper_signature = len(params) >= 3 or 'client' in params
has_iter_method = hasattr(user_provided_stream_cls, '__iter__')
if has_proper_signature and has_iter_method:
effective_stream_cls = cast(Any, user_provided_stream_cls)
# else: incompatible, use default
except (TypeError, ValueError):
# If we can't inspect the signature, fall back to default
pass # effective_stream_cls remains Stream
# else: it's not a class, so use default.
raw_iterator: Iterator[ChatCompletionChunk] = self._client._stream_request(
method="POST",
path="chat/completions",
json_data=body,
cast_to=ChatCompletionChunk
)
return effective_stream_cls(raw_iterator, client=self._client)
else:
# Use regular post method for non-streaming responses
response = self._client.post("chat/completions", json_data=body, cast_to=ChatCompletion)
# The response is now cast by the client to ChatCompletion
return response
# --- Async Resource Class ---
[docs]
class AsyncChatCompletions(AsyncAPIResource):
"""
Provides access to asynchronous chat completion operations.
This class manages asynchronous chat completion operations with Venice AI models,
supporting both standard (non-streaming) and streaming response formats. It serves
as the primary interface for chat-based interactions with Venice AI language models
in asynchronous contexts.
The class handles parameter validation, request formation, and response parsing
for asynchronous chat completion requests.
:param _client: The client instance used to make API requests.
:type _client: venice_ai._async_client.AsyncVeniceClient
Example:
.. code-block:: python
from venice_ai import AsyncVeniceClient
import asyncio
async def main():
# Initialize the async client
client = AsyncVeniceClient(api_key="your-api-key")
# Create a chat completion asynchronously
response = await client.chat.completions.create(
model="venice-1",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Tell me about Venice AI."}
]
)
# Access the response content
print(response["choices"][0]["message"]["content"])
# Run the async function
asyncio.run(main())
"""
@overload
async def create(
self,
*,
model: str,
messages: Sequence[MessageParam],
stream: Literal[False] = False, # Explicit non-streaming case
# --- Common Optional Parameters ---
frequency_penalty: Optional[float] = None,
max_tokens: Optional[int] = None, # Deprecated. Please use max_completion_tokens instead.
max_completion_tokens: Optional[int] = None,
n: Optional[int] = None,
presence_penalty: Optional[float] = None,
response_format: Optional[ResponseFormat] = None,
seed: Optional[int] = None,
stop: Optional[Union[str, Sequence[str]]] = None,
temperature: Optional[float] = None,
top_p: Optional[float] = None,
tools: Optional[Sequence[Tool]] = None,
tool_choice: Optional[Union[Literal["none", "auto"], ToolChoiceObject]] = None,
user: Optional[str] = None, # Discarded but supported for OpenAI compat
venice_parameters: Optional[VeniceParameters] = None,
# --- Less Common / Newer Params from Docs ---
logprobs: Optional[bool] = None, # If requesting logprobs (check API if bool or object)
top_logprobs: Optional[int] = None,
parallel_tool_calls: Optional[bool] = None,
repetition_penalty: Optional[float] = None,
stop_token_ids: Optional[Sequence[int]] = None,
top_k: Optional[int] = None,
stream_options: Optional[StreamOptions] = None,
stream_cls: Optional[Type[ChunkModelFactory[ChatCompletionChunk]]] = None,
# min_temp, max_temp - Check if these are standard or venice specific
# stream_options - Handled by stream=True overload
# Extra arguments are ignored for now, could add **kwargs
) -> ChatCompletion: # Return type for non-streaming
...
@overload
async def create(
self,
*,
model: str,
messages: Sequence[MessageParam],
stream: Literal[True],
stream_cls: Optional[Type[ChunkModelFactory[ChatCompletionChunk]]] = None,
# --- Common Optional Parameters ---
frequency_penalty: Optional[float] = None,
max_tokens: Optional[int] = None, # Deprecated. Please use max_completion_tokens instead.
max_completion_tokens: Optional[int] = None,
n: Optional[int] = None,
presence_penalty: Optional[float] = None,
response_format: Optional[ResponseFormat] = None,
seed: Optional[int] = None,
stop: Optional[Union[str, Sequence[str]]] = None,
temperature: Optional[float] = None,
top_p: Optional[float] = None,
tools: Optional[Sequence[Tool]] = None,
tool_choice: Optional[Union[Literal["none", "auto"], ToolChoiceObject]] = None,
user: Optional[str] = None,
venice_parameters: Optional[VeniceParameters] = None,
# --- Less Common / Newer Params ---
logprobs: Optional[bool] = None,
top_logprobs: Optional[int] = None,
parallel_tool_calls: Optional[bool] = None,
repetition_penalty: Optional[float] = None,
stop_token_ids: Optional[Sequence[int]] = None,
top_k: Optional[int] = None,
stream_options: Optional[StreamOptions] = None,
) -> AsyncIterator[ChatCompletionChunk]: # Return type for streaming (async iterator of dicts)
...
[docs]
async def create(
self,
*,
model: str,
messages: Sequence[MessageParam],
stream: bool = False,
stream_cls: Optional[Type[ChunkModelFactory[ChatCompletionChunk]]] = None,
**kwargs: Any # Catch all other keyword args
) -> Union[ChatCompletion, AsyncIterator[ChatCompletionChunk]]:
"""
Create a model response for the given chat conversation asynchronously.
This method handles the core functionality of the chat completions API, allowing
for both synchronous and streaming responses in async contexts. It sends the provided
messages and parameters to the Venice AI API and returns either a complete response or
a stream of partial responses.
The method automatically formats the request body, applies appropriate defaults,
and routes the request to either the standard or streaming endpoint based on
the ``stream`` parameter.
:param model: ID of the model to use (e.g., ``"venice-1"``, ``"llama-3.3-70b"``).
:type model: str
:param messages: Sequence of messages forming the conversation.
:type messages: Sequence[venice_ai.types.chat.MessageParam]
:param stream: If ``True``, stream back partial progress. Defaults to ``False``.
Returns an ``AsyncIterator[ChatCompletionChunk]`` if ``True``, otherwise ``ChatCompletion``.
:type stream: bool
:param stream_cls: Optional stream wrapper class for streaming responses. Must conform to the ChunkModelFactory protocol.
:type stream_cls: Optional[Type[venice_ai.types.chat.ChunkModelFactory[venice_ai.types.chat.ChatCompletionChunk]]]
:param frequency_penalty: Number between -2.0 and 2.0. Positive values penalize new tokens based on their existing frequency in the text so far.
:type frequency_penalty: Optional[float]
:param max_tokens: Deprecated. Please use ``max_completion_tokens`` instead.
The maximum number of tokens that can be generated in the chat completion.
The total length of input tokens and generated tokens is limited by the model's context length.
:type max_tokens: Optional[int]
:param max_completion_tokens: Maximum number of tokens that can be generated in the chat completion.
:type max_completion_tokens: Optional[int]
:param n: Number of chat completion choices to generate for each input message.
:type n: Optional[int]
:param presence_penalty: Number between -2.0 and 2.0. Positive values penalize new tokens based on whether they appear in the text so far.
:type presence_penalty: Optional[float]
:param response_format: Specifies the format that the model must output (e.g., for JSON mode).
:type response_format: Optional[venice_ai.types.chat.ResponseFormat]
:param seed: Random seed for reproducible outputs.
:type seed: Optional[int]
:param stop: Up to 4 sequences where the API will stop generating further tokens.
:type stop: Optional[Union[str, Sequence[str]]]
:param temperature: Sampling temperature between 0.0 and 2.0. Higher values make output more random, lower values more focused and deterministic. Defaults to 0.7.
:type temperature: Optional[float]
:param top_p: Nucleus sampling parameter between 0.0 and 1.0. Defaults to 1.0.
:type top_p: Optional[float]
:param tools: List of tools the model may call.
:type tools: Optional[Sequence[venice_ai.types.chat.Tool]]
:param tool_choice: Controls which (if any) tool is called by the model. Can be ``"none"``, ``"auto"``, or a specific tool.
:type tool_choice: Optional[Union[Literal["none", "auto"], venice_ai.types.chat.ToolChoiceObject]]
:param user: Unique identifier representing your end-user (discarded by API but supported for OpenAI compatibility).
:type user: Optional[str]
:param venice_parameters: Venice-specific parameters for fine-tuning model behavior.
:type venice_parameters: Optional[venice_ai.types.chat.VeniceParameters]
:param logprobs: Whether to return log probabilities of the output tokens.
:type logprobs: Optional[bool]
:param top_logprobs: Number of most likely tokens to return at each token position if ``logprobs`` is ``True``.
:type top_logprobs: Optional[int]
:param parallel_tool_calls: Whether to enable parallel function calling during tool use.
:type parallel_tool_calls: Optional[bool]
:param repetition_penalty: Penalty for token repetition.
:type repetition_penalty: Optional[float]
:param stop_token_ids: List of token IDs at which to stop generation.
:type stop_token_ids: Optional[Sequence[int]]
:param top_k: Number of highest probability vocabulary tokens to keep for top-k-filtering.
:type top_k: Optional[int]
:param stream_options: Additional options for controlling streaming behavior.
:type stream_options: Optional[venice_ai.types.chat.StreamOptions]
:param logit_bias: Modify the likelihood of specified tokens appearing in the completion. Accepts a JSON object that maps tokens (specified by their token ID in the tokenizer) to an associated bias value from -100 to 100.
:type logit_bias: Optional[Dict[str, int]]
:param kwargs: Additional keyword arguments.
:return: A :class:`~venice_ai.types.chat.ChatCompletion` if ``stream`` is ``False``,
otherwise an ``AsyncIterator`` of :class:`~venice_ai.types.chat.ChatCompletionChunk`.
:rtype: Union[venice_ai.types.chat.ChatCompletion, AsyncIterator[venice_ai.types.chat.ChatCompletionChunk]]
:raises venice_ai.exceptions.InvalidRequestError: If parameters are invalid or malformed.
:raises venice_ai.exceptions.AuthenticationError: If the API key is invalid or missing.
:raises venice_ai.exceptions.PermissionDeniedError: If access is denied to the requested model or feature.
:raises venice_ai.exceptions.NotFoundError: If the model or resource is not found.
:raises venice_ai.exceptions.RateLimitError: If rate limits are exceeded for the account.
:raises venice_ai.exceptions.APIError: For other API-related errors not covered by specific exceptions.
Example:
.. code-block:: python
# Non-streaming async usage
import asyncio
from venice_ai import AsyncVeniceClient
async def main():
client = AsyncVeniceClient(api_key="your-api-key")
response = await client.chat.completions.create(
model="llama-3.3-70b",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Explain async programming in Python."}
],
temperature=0.3
)
print(response["choices"][0]["message"]["content"])
asyncio.run(main())
# Async streaming usage
async def stream_example():
client = AsyncVeniceClient(api_key="your-api-key")
async for chunk in await client.chat.completions.create(
model="venice-1",
messages=[{"role": "user", "content": "Tell me a story."}],
stream=True,
max_completion_tokens=200
):
content = chunk["choices"][0]["delta"].get("content", "")
if content:
print(content, end="", flush=True)
asyncio.run(stream_example())
"""
# Logic to handle max_tokens deprecation and precedence
actual_max_completion_tokens: Optional[int] = kwargs.pop("max_completion_tokens", None)
deprecated_max_tokens: Optional[int] = kwargs.pop("max_tokens", None)
if actual_max_completion_tokens is not None:
kwargs["max_completion_tokens"] = actual_max_completion_tokens
if deprecated_max_tokens is not None:
warnings.warn(
"Both `max_tokens` and `max_completion_tokens` were provided. "
"`max_tokens` is deprecated and will be ignored in favor of `max_completion_tokens`.",
DeprecationWarning,
stacklevel=2,
)
elif deprecated_max_tokens is not None:
warnings.warn(
"The `max_tokens` parameter is deprecated. Please use `max_completion_tokens` instead.",
DeprecationWarning,
stacklevel=2,
)
kwargs["max_completion_tokens"] = deprecated_max_tokens
# Construct request body, filtering out None values from kwargs
body: Dict[str, Any] = {
"model": model,
"messages": messages,
"stream": stream, # Set based on the stream parameter
}
# Add optional parameters from kwargs if they are not None
# Exclude 'stream_cls' from being added to the body if it's in kwargs
# as it's a type and not part of the API request body.
processed_kwargs = {k: v for k, v in kwargs.items() if k != 'stream_cls'}
for key, value in processed_kwargs.items():
if value is not None:
body[key] = value
# Handle specific naming or structuring if needed
# e.g. if venice_parameters needs special handling
if stream:
user_provided_stream_cls_async = stream_cls
effective_stream_cls_async: Any = AsyncStream # Default
if user_provided_stream_cls_async is not None:
if inspect.isclass(user_provided_stream_cls_async):
try:
# First check if it's a subclass of our known stream types
if issubclass(user_provided_stream_cls_async, (Stream, AsyncStream)):
effective_stream_cls_async = cast(Any, user_provided_stream_cls_async)
else:
# For custom classes, check if they have the proper interface
# They should have __init__ with iterator and client params, and __aiter__ method
sig = inspect.signature(user_provided_stream_cls_async.__init__)
params = list(sig.parameters.keys())
has_proper_signature = len(params) >= 3 or 'client' in params
has_aiter_method = hasattr(user_provided_stream_cls_async, '__aiter__')
if has_proper_signature and has_aiter_method:
effective_stream_cls_async = cast(Any, user_provided_stream_cls_async)
# else: incompatible, use default
except (TypeError, ValueError):
# If we can't inspect the signature, fall back to default
pass # effective_stream_cls_async remains AsyncStream
# else: not a class, use default
# else: stream_cls is None, use default
# _stream_request is an async generator function, calling it returns the async generator object.
raw_iterator: AsyncIterator[ChatCompletionChunk] = self._client._stream_request(
method="POST",
path="chat/completions",
json_data=body,
cast_to=ChatCompletionChunk
)
return effective_stream_cls_async(raw_iterator, client=self._client)
else:
# Use regular post method for non-streaming responses
response = await self._client.post("chat/completions", json_data=body, cast_to=ChatCompletion)
# The response is now cast by the client to ChatCompletion
return response