1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
| from kafka import KafkaConsumer consumer = KafkaConsumer( *topics, bootstrap_servers='localhost:9092', client_id='my-consumer', group_id='my-group', key_deserializer=lambda m: m.decode('utf-8'), value_deserializer=lambda m: m.decode('utf-8'), enable_incremental_fetch_sessions=True, fetch_min_bytes=1, fetch_max_wait_ms=500, fetch_max_bytes=52428800, max_partition_fetch_bytes=1048576, request_timeout_ms=305000, retry_backoff_ms=100, reconnect_backoff_ms=50, reconnect_backoff_max_ms=30000, max_in_flight_requests_per_connection=5, auto_offset_reset='latest', enable_auto_commit=True, auto_commit_interval_ms=5000, default_offset_commit_callback=None, check_crcs=True, isolation_level='read_uncommitted', allow_auto_create_topics=True, metadata_max_age_ms=300000, partition_assignment_strategy=None, max_poll_records=500, max_poll_interval_ms=300000, session_timeout_ms=10000, heartbeat_interval_ms=3000, receive_buffer_bytes=None, send_buffer_bytes=None, socket_options=[(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)], consumer_timeout_ms=float('inf'), security_protocol='PLAINTEXT', ssl_context=None, ssl_check_hostname=True, ssl_cafile=None, ssl_certfile=None, ssl_keyfile=None, ssl_password=None, ssl_crlfile=None, ssl_ciphers=None, api_version=None, api_version_auto_timeout_ms=2000, connections_max_idle_ms=540000, metric_reporters=None, metrics_enabled=True, metrics_num_samples=2, metrics_sample_window_ms=30000, selector=None, exclude_internal_topics=True, sasl_mechanism=None, sasl_plain_username=None, sasl_plain_password=None, sasl_kerberos_name=None, sasl_kerberos_service_name='kafka', sasl_kerberos_domain_name=None, sasl_oauth_token_provider=None, socks5_proxy=None, kafka_client=None )
|